1 #ifndef STREAMITER__HPP
2 #define STREAMITER__HPP
3 
4 /*  $Id: streamiter.hpp 609562 2020-06-03 19:09:59Z whlavina $
5 * ===========================================================================
6 *
7 *                            PUBLIC DOMAIN NOTICE
8 *               National Center for Biotechnology Information
9 *
10 *  This software/database is a "United States Government Work" under the
11 *  terms of the United States Copyright Act.  It was written as part of
12 *  the author's official duties as a United States Government employee and
13 *  thus cannot be copyrighted.  This software/database is freely available
14 *  to the public for use. The National Library of Medicine and the U.S.
15 *  Government have not placed any restriction on its use or reproduction.
16 *
17 *  Although all reasonable efforts have been taken to ensure the accuracy
18 *  and reliability of the software and data, the NLM and the U.S.
19 *  Government do not and cannot warrant the performance or results that
20 *  may be obtained by using this software or data. The NLM and the U.S.
21 *  Government disclaim all warranties, express or implied, including
22 *  warranties of performance, merchantability or fitness for any particular
23 *  purpose.
24 *
25 *  Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Authors: Andrei Gourianov, Alexander Astashyn
30 *
31 * File Description:
32 *   Input stream iterators
33 * Please note:
34 *   This API requires multi-threading
35 */
36 
37 #include <corelib/ncbistd.hpp>
38 #include <corelib/ncbithr.hpp>
39 #include <serial/objistr.hpp>
40 #include <serial/objectio.hpp>
41 
42 #include <queue>
43 #include <future>
44 #include <thread>
45 #include <mutex>
46 #include <condition_variable>
47 
48 
49 /** @addtogroup ObjStreamSupport
50  *
51  * @{
52  */
53 
54 
55 BEGIN_NCBI_SCOPE
56 
57 #if defined(NCBI_THREADS)
58 
59 namespace ns_ObjectIStreamFilterIterator {
60 template<typename TRoot>
61 TMemberIndex xxx_MemberIndex(const string& mem_name);
62 }
63 template<typename...>
64 class CObjectIStreamAsyncIterator;
65 
66 /////////////////////////////////////////////////////////////////////////////
67 ///  CObjectIStreamIterator
68 ///
69 ///  Synchronously read multiple same-type data objects from an input stream
70 ///  with optional filtering.
71 ///  @sa CObjectIStreamAsyncIterator
72 ///
73 ///  The algorithm assumes that the input stream on its top level consists
74 ///  exclusively of one or more serial objects of type TRoot.
75 ///
76 ///  There are two flavors of this template:
77 ///  - CObjectIStreamIterator<TRoot> - iterate through the top-level serial
78 ///    objects of type TRoot.
79 ///  - CObjectIStreamIterator<TRoot, TChild> - iterate through serial objects
80 ///    of type TChild which are contained within the top-level serial objects
81 ///    of type TRoot.
82 ///
83 ///  Usage:
84 ///  @code
85 ///
86 ///  CObjectIStream istr ....;
87 ///
88 ///  for (CSeq_entry& obj : CObjectIStreamIterator<CSeq_entry>(istr)) {
89 ///      // ...do something with "obj" here...
90 ///  }
91 ///
92 ///  for (CBioseq& obj : CObjectIStreamIterator<CSeq_entry,CBioseq>(istr)) {
93 ///      // ...do something with "obj" here...
94 ///  }
95 ///
96 ///
97 ///  CObjectIStreamIterator<CSeq_entry> it(istr);
98 ///  CObjectIStreamIterator<CSeq_entry> eos;
99 ///  for_each (it, eos, [](CSeq_entry& obj) { ... });
100 ///
101 ///  CObjectIStreamIterator<CSeq_entry,CBioseq> it(istr);
102 ///  CObjectIStreamIterator<CSeq_entry,CBioseq> eos;
103 ///  for_each (it, eos, [](CBioseq& obj) { ... });
104 ///
105 ///
106 ///  for (CObjectIStreamIterator<CSeq_entry> it(istr);  it.IsValid();  ++it) {
107 ///      CSeq_entry& obj = *it;
108 ///      // ...do something with "obj" here...
109 ///  }
110 ///
111 ///  for (CObjectIStreamIterator<CSeq_entry,CBioseq> it(istr);
112 ///       it.IsValid();  ++it) {
113 ///      CRef<CBioseq> obj(&*it);
114 ///      // ...do something with "obj" here...
115 ///  }
116 ///
117 ///  for (CObjectIStreamIterator<CSeq_entry, string> it(istr);
118 ///       it.IsValid();  ++it) {
119 ///      string& obj = *it;
120 ///  }
121 ///
122 ///  with filtering (only CTaxon1_data objects with optional 'org' member set are valid):
123 ///    CObjectIStreamIterator<CTaxon1_data> i(istr, eNoOwnership,
124 ///        CObjectIStreamIterator<CTaxon1_data>::CParams().FilterByMember("org",
125 ///            [](const CObjectIStream& istr, CTaxon1_data& obj,
126 ///               TMemberIndex mem_index, CObjectInfo* mem, void* extra)->bool {
127 ///               return mem != nullptr;
128 ///            }));
129 ///
130 ///  @endcode
131 ///
132 ///  @attention
133 ///   Input iterators only guarantee validity for single pass algorithms:
134 ///   once an input iterator has been incremented, all copies of its previous
135 ///   value may be invalidated. It is still possible to keep data objects
136 ///   for future use by placing them into CRef containers, when applicable.
137 
138 template<typename...>
139 class CObjectIStreamIterator
140 {
141 public:
142 
143     /// Object member filtering function
144     ///
145     /// @param istr
146     ///   Serial object stream
147     /// @param obj
148     ///   Object being checked. It is being populated and is incomplete.
149     /// @param mem_index
150     ///   Member index
151     /// @param mem
152     ///   Member information. If mem is nullptr, the member is missing in the stream.
153     /// @param extra
154     ///   Extra information provided by the caller when constructing iterator.
155     ///
156     /// @attention
157     ///   When using filtering with CObjectIStreamAsyncIterator, please note
158     ///   that the function may be called from different threads.
159     ///   Synchronization of access to shared data, if required, is the responsibility of the client.
160     template<typename TObj>
161     using FMemberFilter = function<bool(const CObjectIStream& istr, TObj& obj,
162                                         TMemberIndex mem_index, CObjectInfo* mem,
163                                         void* extra)>;
164     /// Filtering parameters
165     template<typename TObj>
166     class CParams
167     {
168     public:
CParams(void)169         CParams(void)
170             : m_Index(kInvalidMember)
171             , m_FnFilter(nullptr)
172             , m_Extra(nullptr) {
173         }
174         /// Filter by member index
FilterByMember(TMemberIndex index,FMemberFilter<TObj> fn,void * extra=nullptr)175         CParams& FilterByMember(TMemberIndex index, FMemberFilter<TObj> fn, void* extra = nullptr) {
176             m_Index = index; m_FnFilter = fn; m_Extra = extra; return *this;
177         }
178         /// Filter by member name
FilterByMember(const string & mem_name,FMemberFilter<TObj> fn,void * extra=nullptr)179         CParams& FilterByMember(const string& mem_name, FMemberFilter<TObj> fn, void* extra = nullptr) {
180             m_Index = ns_ObjectIStreamFilterIterator::xxx_MemberIndex<TObj>(mem_name);
181             m_FnFilter = fn; m_Extra = extra; return *this;
182         }
183 
184     private:
185 //        void xxx_MemberIndex(const string& mem_name);
186         TMemberIndex        m_Index;
187         FMemberFilter<TObj> m_FnFilter;
188         void*               m_Extra;
189         template<typename...> friend class CObjectIStreamIterator;
190         template<typename...> friend class CObjectIStreamAsyncIterator;
191     };
192 
193     /// Construct iterator upon an object serialization stream
194     ///
195     /// @param istr
196     ///   Serial object stream
197     /// @param own_istr
198     ///   eTakeOwnership means that the input stream will be deleted
199     ///   automatically when the iterator gets destroyed
200     /// @param params
201     ///   Filtering parameters (default is no filtering)
202     template<typename TObj>
203     CObjectIStreamIterator( CObjectIStream& istr,
204                             EOwnership deleteInStream   = eNoOwnership,
205                             const CParams<TObj>& params = CParams<TObj>()) = delete;
206 
207     /// Construct end-of-stream (invalid) iterator
208     /// @sa IsValid()
209     CObjectIStreamIterator(void) = delete;
210 
211     // Copy-ctor and assignment
212     CObjectIStreamIterator(const CObjectIStreamIterator&);
213     CObjectIStreamIterator& operator=(const CObjectIStreamIterator&);
214 
215     /// Advance to the next data object
216     CObjectIStreamIterator& operator++(void);
217 
218     // Comparison
219     bool operator==(const CObjectIStreamIterator&) const;
220     bool operator!=(const CObjectIStreamIterator&) const;
221 
222     /// Check whether the iterator points to a data
223     /// TRUE if the iterator is constructed upon a serialization stream AND
224     /// if it's not end-of-stream or error-in-stream
225     bool IsValid(void) const;
226 
227     /// Return the underlying serial object stream
228     const CObjectIStream& GetObjectIStream(void) const;
229 
230     /// Return data object which is currently pointed to by the iterator.
231     /// Throw an exception is the iterator does not point to a data, i.e.
232     /// if IsValid() is FALSE.
233     template<typename TObj>  TObj& operator*();
234 
235     /// Return pointer to data object which is currently pointed to by the
236     /// iterator.
237     /// Return NULL is the iterator does not point to a data, i.e.
238     /// if IsValid() is FALSE.
239     template<typename TObj>  TObj* operator->();
240 
241     /// Return self
242     CObjectIStreamIterator& begin(void);
243 
244     /// Construct and return end-of-stream iterator
245     CObjectIStreamIterator  end(void);
246 
247     // dtor
248     ~CObjectIStreamIterator();
249 };
250 
251 
252 /////////////////////////////////////////////////////////////////////////////
253 ///   CObjectIStreamAsyncIterator
254 ///
255 ///  Asynchronously read multiple same-type data objects from an input stream
256 ///  with optional filtering
257 ///  @sa CObjectIStreamIterator
258 ///
259 ///  The algorithm assumes that the input stream on its top level consists
260 ///  exclusively of one or more serial objects of type TRoot.
261 ///
262 ///  There are two flavors of this template:
263 ///  - CObjectIStreamAsyncIterator<TRoot> - iterate through the top-level
264 ///    serial objects of type TRoot.
265 ///  - CObjectIStreamAsyncIterator<TRoot, TChild> - iterate through serial
266 ///    objects of type TChild which are contained within the top-level serial
267 ///    objects of type TRoot.
268 ///
269 ///  @attention
270 ///    This iterator supports only the TChild types that are derived from
271 ///    CSerialObject class
272 ///
273 ///  Usage:
274 ///  @code
275 ///
276 ///  CObjectIStream istr ....;
277 ///
278 ///  for (CSeq_entry& obj : CObjectIStreamAsyncIterator<CSeq_entry>(istr))
279 ///  {
280 ///      // ...do something with "obj" here...
281 ///  }
282 ///
283 ///  for (CBioseq& obj : CObjectIStreamAsyncIterator<CSeq_entry,CBioseq>(istr))
284 ///  {
285 ///      // ...do something with "obj" here...
286 ///  }
287 ///
288 ///
289 ///  CObjectIStreamAsyncIterator<CSeq_entry> it(istr);
290 ///  CObjectIStreamAsyncIterator<CSeq_entry> eos;
291 ///  for_each (it, eos, [](CSeq_entry& obj) { ... });
292 ///
293 ///  CObjectIStreamAsyncIterator<CSeq_entry,CBioseq> it(istr);
294 ///  CObjectIStreamAsyncIterator<CSeq_entry,CBioseq> eos;
295 ///  for_each (it, eos, [](CBioseq& obj) { ... });
296 ///
297 ///
298 ///  for (CObjectIStreamAsyncIterator<CSeq_entry> it(istr);
299 ///       it.IsValid();  ++it) {
300 ///      CSeq_entry& obj = *it;
301 ///      // ...do something with "obj" here...
302 ///  }
303 ///
304 ///  for (CObjectIStreamAsyncIterator<CSeq_entry,CBioseq> it(istr);
305 ///       it.IsValid();  ++it) {
306 ///      CRef<CBioseq> obj(&*it);
307 ///      // ...do something with "obj" here...
308 ///  }
309 ///
310 ///  @endcode
311 ///
312 ///  To speed up reading, the iterator offloads data reading, pre-parsing and
313 ///  parsing into separate threads. If the data stream contains numerous TRoot
314 ///  data records CObjectIStreamAsyncIterator can give up to 2-4 times speed-up
315 ///  (wall-clock wise) comparing to the synchronous processing (such as with
316 ///  CObjectIStreamIterator) of the same data.
317 ///
318 ///  The reader has to read the whole object into memory. If such objects are
319 ///  relatively small, then there will be several objects read into a single
320 ///  buffer, which is good. If data object is big it still goes into a single
321 ///  buffer no matter how big the object is.
322 ///  To limit memory consumption, use MaxTotalRawSize parameter.
323 ///
324 ///  The iterator does its job asynchronously. It starts working immediately
325 ///  after its creation and stops only when it is destroyed.
326 ///  Even if you do not use it, it still works in the background, reading and
327 ///  parsing the data.
328 ///
329 ///  @attention
330 ///   Input iterators only guarantee validity for single pass algorithms:
331 ///   once an input iterator has been incremented, all copies of its previous
332 ///   value may be invalidated. It is still possible to keep data objects
333 ///   for future use by placing them into CRef containers, when applicable.
334 
335 template<typename...>
336 class CObjectIStreamAsyncIterator
337 {
338 public:
339 
340     /// Asynchronous parsing parameters
341     template<typename TObj>
342     class CParams : public CObjectIStreamIterator<>::CParams<TObj>
343     {
344     public:
345         using CParent       = CObjectIStreamIterator<>::CParams<TObj>;
346         template<typename TR>
347         using FMemberFilter = CObjectIStreamIterator<>::FMemberFilter<TR>;
348 
CParams(void)349         CParams(void)
350             : m_ThreadPolicy(launch::async)
351             , m_MaxParserThreads (16)
352             , m_MaxTotalRawSize  (16 * 1024 * 1024)
353             , m_MinRawBufferSize (128 * 1024)
354             , m_SameThread(false) {
355         }
356 
357         /// Filter by member index
FilterByMember(TMemberIndex index,FMemberFilter<TObj> fn,void * extra=nullptr)358         CParams& FilterByMember(TMemberIndex index, FMemberFilter<TObj> fn, void* extra = nullptr) {
359             CParent::FilterByMember(index, fn, extra); return *this;
360         }
361 
362         /// Filter by member name
FilterByMember(const string & mem_name,FMemberFilter<TObj> fn,void * extra=nullptr)363         CParams& FilterByMember(const string& mem_name, FMemberFilter<TObj> fn, void* extra = nullptr) {
364             CParent::FilterByMember(mem_name, fn, extra); return *this;
365         }
366 
367         /// Parsing thread launch policy
LaunchPolicy(launch policy)368         CParams& LaunchPolicy(launch policy) {
369             m_ThreadPolicy = policy; return *this;
370         }
371 
372         /// Maximum number of parsing threads
MaxParserThreads(unsigned max_parser_threads)373         CParams& MaxParserThreads(unsigned max_parser_threads) {
374             m_MaxParserThreads = max_parser_threads;  return *this;
375         }
376 
377         /// Total size of raw data buffers is allowed to grow to this value
MaxTotalRawSize(size_t max_total_raw_size)378         CParams& MaxTotalRawSize(size_t max_total_raw_size) {
379             m_MaxTotalRawSize = max_total_raw_size;  return *this;
380         }
381 
382         /// Single raw data memory buffer size should be at least this big
MinRawBufferSize(size_t min_raw_buffer_size)383         CParams& MinRawBufferSize(size_t min_raw_buffer_size) {
384             m_MinRawBufferSize = min_raw_buffer_size;  return *this;
385         }
386 
387         /// Raw data read and its pre-parsing (storing the raw data pertaining
388         /// to a single object and putting it into the parsing queue) to be
389         /// done in the same thread.
390         /// @note
391         ///  The default is to do these two tasks in two separate threads,
392         ///  which in some cases can give an additional 10-20% performance
393         ///  boost, wall-clock time wise.
ReadAndSkipInTheSameThread(bool same_thread)394         CParams& ReadAndSkipInTheSameThread(bool same_thread) {
395             m_SameThread = same_thread;  return *this;
396         }
397 
398     private:
399         launch    m_ThreadPolicy;
400         unsigned  m_MaxParserThreads;
401         size_t    m_MaxTotalRawSize;
402         size_t    m_MinRawBufferSize;
403         bool      m_SameThread;
404 
405         template<typename...> friend class CObjectIStreamAsyncIterator;
406     };
407 
408 
409     /// Construct iterator upon an object serialization stream
410     ///
411     /// @param istr
412     ///   Serial object stream
413     /// @param own_istr
414     ///   eTakeOwnership means that the input stream will be deleted
415     ///   automatically when the iterator gets destroyed
416     /// @param params
417     ///   Parsing algorithm's parameters
418     /// @param params
419     ///   Filtering and parsing parameters (default is no filtering)
420     template<typename TObj>
421     CObjectIStreamAsyncIterator(CObjectIStream& istr,
422                                 EOwnership own_istr         = eNoOwnership,
423                                 const CParams<TObj>& params = CParams<TObj>()) = delete;
424 
425     /// Construct end-of-stream (invalid) iterator
426     /// @sa IsValid()
427     CObjectIStreamAsyncIterator(void) = delete;
428 
429     // Copy-ctor and assignment
430     CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator&);
431     CObjectIStreamAsyncIterator& operator=(const CObjectIStreamAsyncIterator&);
432 
433     /// Advance to the next data object
434     CObjectIStreamAsyncIterator& operator++(void);
435 
436     // Comparison
437     bool operator==(const CObjectIStreamAsyncIterator&) const;
438     bool operator!=(const CObjectIStreamAsyncIterator&) const;
439 
440     /// Check whether the iterator points to a data
441     /// TRUE if the iterator is constructed upon a serialization stream AND
442     /// if it's not end-of-stream or error-in-stream
443     bool IsValid(void) const;
444 
445     /// Return data object which is currently pointed to by the iterator.
446     /// Throw an exception is the iterator does not point to a data, i.e.
447     /// if IsValid() is FALSE.
448     template<typename TObj>  TObj& operator*();
449 
450     /// Return pointer to data object which is currently pointed to by the
451     /// iterator.
452     /// Return NULL is the iterator does not point to a data, i.e.
453     /// if IsValid() is FALSE.
454     template<typename TObj>  TObj* operator->();
455 
456     /// Return self
457     CObjectIStreamAsyncIterator& begin(void);
458 
459     /// Construct and return end-of-stream iterator
460     CObjectIStreamAsyncIterator  end(void);
461 
462     // dtor
463     ~CObjectIStreamAsyncIterator();
464 };
465 
466 
467 
468 /////////////////////////////////////////////////////////////////////////////
469 /////////////////////////////////////////////////////////////////////////////
470 /// template specializations and implementation
471 
472 /////////////////////////////////////////////////////////////////////////////
473 ///  CObjectIStreamIterator<TRoot>
474 
475 template<typename TRoot>
476 class CObjectIStreamIterator<TRoot>
477     : public iterator< input_iterator_tag, TRoot, ptrdiff_t, TRoot*, TRoot& >
478 {
479 public:
480     using CParams = CObjectIStreamIterator<>::CParams<TRoot>;
481 
482     CObjectIStreamIterator( CObjectIStream& istr,
483                             EOwnership deleteInStream = eNoOwnership,
484                             const CParams& params     = CParams());
485 
486     CObjectIStreamIterator(void);
487     CObjectIStreamIterator(const CObjectIStreamIterator&);
488     CObjectIStreamIterator& operator=(const CObjectIStreamIterator&);
489     ~CObjectIStreamIterator();
490 
491     CObjectIStreamIterator& operator++(void);
492     bool operator==(const CObjectIStreamIterator&) const;
493     bool operator!=(const CObjectIStreamIterator&) const;
494     bool IsValid(void) const;
495     const CObjectIStream& GetObjectIStream(void) const;
496 
497     TRoot& operator*();
498     TRoot* operator->();
499 
500     CObjectIStreamIterator& begin(void);
501     CObjectIStreamIterator  end(void);
502 
503 protected:
504     struct CData
505     {
506         CData(CObjectIStream& istr, EOwnership deleteInStream,
507               const CParams& params, TTypeInfo tinfo);
508         ~CData(void);
509 
510         void x_BeginRead(void);
511         void x_EndRead(void);
512         void x_AcceptData(CObjectIStream& in, const CObjectInfo& type);
513         void x_Next(void);
514         bool x_NextNoFilter(const CObjectInfo& objinfo);
515         bool x_NextSeqWithFilter(const CObjectInfo& objinfo);
516         bool x_NextChoiceWithFilter(const CObjectInfo& objinfo);
517         bool x_NextContainerWithFilter(const CObjectInfo& objinfo);
518 
519         CObjectIStream*    m_Istr;
520         EOwnership         m_Own;
521         CObjectTypeInfo    m_ValueType;
522         CObjectInfo        m_Value;
523         bool               m_HasReader;
524         bool               m_EndOfData;
525         CParams            m_Params;
526         mutex              m_ReaderMutex;
527         condition_variable m_ReaderCv;
528         thread             m_Reader;
529         exception_ptr      m_ReaderExpt;
530         enum EFilter {
531             eNone,
532             eOneSeq,
533             eOneRandom,
534             eAllSeq,
535             eAllRandom,
536             eOneChoice,
537             eAllChoice,
538             eOneContainer,
539             eAllContainer
540         } m_FilterType;
541 
542         template<typename TR>
543         class x_CObjectIStreamIteratorHook : public CSkipObjectHook
544         {
545         public:
x_CObjectIStreamIteratorHook(typename CObjectIStreamIterator<TR>::CData * pthis)546             x_CObjectIStreamIteratorHook(
547                 typename CObjectIStreamIterator<TR>::CData* pthis)
548                     : m_This(pthis) {
549             }
SkipObject(CObjectIStream & in,const CObjectTypeInfo & type)550             virtual void SkipObject(CObjectIStream& in, const CObjectTypeInfo& type) override {
551                 m_This->x_AcceptData(in,CObjectInfo(type.GetTypeInfo()));
552             }
553         private:
554             typename CObjectIStreamIterator<TR>::CData* m_This;
555         };
556     };
557 
558     CObjectIStreamIterator( CObjectIStream& istr,
559         const CParams& params, EOwnership deleteInStream);
560     void x_ReaderThread(void);
561     shared_ptr<CData> m_Data;
562 };
563 
564 /////////////////////////////////////////////////////////////////////////////
565 ///  CObjectIStreamIterator<TRoot, TChild>
566 
567 template<typename TRoot, typename TChild>
568 class CObjectIStreamIterator<TRoot, TChild>
569     : public CObjectIStreamIterator<TChild>
570 {
571 public:
572     using CParams = CObjectIStreamIterator<>::CParams<TChild>;
573 
574     CObjectIStreamIterator( CObjectIStream& istr,
575                             EOwnership deleteInStream  = eNoOwnership,
576                             const CParams& params      = CParams());
577 
578     CObjectIStreamIterator(void);
579     CObjectIStreamIterator(const CObjectIStreamIterator&);
580     CObjectIStreamIterator& operator=(const CObjectIStreamIterator&);
581     ~CObjectIStreamIterator();
582 
583     CObjectIStreamIterator& operator++(void);
584     CObjectIStreamIterator& begin(void);
585     CObjectIStreamIterator  end(void);
586 
587 protected:
588     using CParent       = CObjectIStreamIterator<TChild>;
589     void x_ReaderThread(void);
590 
591     template<typename TR>
592     class x_CObjectIStreamIteratorReadHook : public CReadObjectHook
593     {
594     public:
x_CObjectIStreamIteratorReadHook(typename CObjectIStreamIterator<TR>::CData * pthis)595         x_CObjectIStreamIteratorReadHook(
596             typename CObjectIStreamIterator<TR>::CData* pthis)
597                 : m_This(pthis) {
598         }
ReadObject(CObjectIStream & in,const CObjectInfo & type)599         virtual void ReadObject(CObjectIStream& in, const CObjectInfo& type) override {
600             m_This->x_AcceptData(in,type);
601         }
602     private:
603         typename CObjectIStreamIterator<TR>::CData* m_This;
604     };
605 };
606 
607 
608 /////////////////////////////////////////////////////////////////////////////
609 // helpers
610 
611 namespace ns_ObjectIStreamFilterIterator {
612 
613 template<typename TRoot>
614 typename enable_if< is_base_of< CSerialObject, TRoot>::value, TTypeInfo>::type
xxx_GetTypeInfo(void)615 xxx_GetTypeInfo(void)
616 {
617     return TRoot::GetTypeInfo();
618 }
619 
620 template<typename TRoot>
621 //typename enable_if< !is_base_of< CSerialObject, TRoot>::value, TTypeInfo>::type
622 typename enable_if< is_pod<TRoot>::value || is_convertible<TRoot, std::string>::value, TTypeInfo>::type
xxx_GetTypeInfo(void)623 xxx_GetTypeInfo(void)
624 {
625     return CStdTypeInfo<TRoot>::GetTypeInfo();
626 }
627 
628 template<typename TRoot>
xxx_MemberIndex(const string & mem_name)629 TMemberIndex xxx_MemberIndex(const string& mem_name)
630 {
631     TTypeInfo tinfo = xxx_GetTypeInfo<TRoot>();
632     ETypeFamily type = tinfo->GetTypeFamily();
633     if (type == eTypeFamilyClass || type == eTypeFamilyChoice) {
634         const CClassTypeInfoBase* cinfo = CTypeConverter<CClassTypeInfoBase>::SafeCast(tinfo);
635         return cinfo->GetItems().Find(mem_name);
636     }
637     return kInvalidMember;
638 }
639 
640 } // ns_ObjectIStreamFilterIterator
641 
642 #if 0
643 template<typename...>
644 template<typename TObj>
645 void
646 CObjectIStreamIterator<>::CParams<TObj>::xxx_MemberIndex(const string& mem_name) {
647     TTypeInfo tinfo = ns_ObjectIStreamFilterIterator::xxx_GetTypeInfo<TObj>();
648     ETypeFamily type = tinfo->GetTypeFamily();
649     if (type == eTypeFamilyClass || type == eTypeFamilyChoice) {
650         const CClassTypeInfoBase* cinfo = CTypeConverter<CClassTypeInfoBase>::SafeCast(tinfo);
651         return cinfo->GetItems().Find(mem_name);
652     }
653     return kInvalidMember;
654 }
655 #endif
656 
657 
658 /////////////////////////////////////////////////////////////////////////////
659 ///  CObjectIStreamIterator<TRoot> implementation
660 
661 template<typename TRoot>
CObjectIStreamIterator(void)662 CObjectIStreamIterator<TRoot>::CObjectIStreamIterator(void)
663     : m_Data(nullptr) {
664 }
665 
666 template<typename TRoot>
CObjectIStreamIterator(CObjectIStream & istr,const CParams & params,EOwnership deleteInStream)667 CObjectIStreamIterator<TRoot>::CObjectIStreamIterator(
668     CObjectIStream& istr, const CParams& params, EOwnership deleteInStream)
669     : m_Data( new CData(istr, deleteInStream, params,
670         ns_ObjectIStreamFilterIterator::xxx_GetTypeInfo<TRoot>())) {
671 }
672 
673 template<typename TRoot>
CObjectIStreamIterator(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params)674 CObjectIStreamIterator<TRoot>::CObjectIStreamIterator(
675     CObjectIStream& istr, EOwnership deleteInStream, const CParams& params)
676     : CObjectIStreamIterator(istr, params, deleteInStream)
677 {
678     if (m_Data->m_FilterType != CData::eNone && !m_Data->m_EndOfData) {
679         m_Data->m_HasReader = true;
680         m_Data->m_Reader = thread( mem_fun<void, CObjectIStreamIterator<TRoot> >(
681             &CObjectIStreamIterator<TRoot>::x_ReaderThread), this);
682     }
683     ++(*this);
684 }
685 
686 template<typename TRoot>
CObjectIStreamIterator(const CObjectIStreamIterator & v)687 CObjectIStreamIterator<TRoot>::CObjectIStreamIterator(
688     const CObjectIStreamIterator& v) : m_Data(v.m_Data) {
689 }
690 
691 template<typename TRoot>
692 CObjectIStreamIterator<TRoot>&
operator =(const CObjectIStreamIterator & v)693 CObjectIStreamIterator<TRoot>::operator=(const CObjectIStreamIterator& v) {
694     m_Data = v.m_Data;
695     return *this;
696 }
697 
698 template<typename TRoot>
~CObjectIStreamIterator()699 CObjectIStreamIterator<TRoot>::~CObjectIStreamIterator() {
700 }
701 
702 template<typename TRoot>
CData(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params,TTypeInfo tinfo)703 CObjectIStreamIterator<TRoot>::CData::CData(
704     CObjectIStream& istr, EOwnership deleteInStream,
705     const CParams& params, TTypeInfo tinfo)
706     : m_Istr(&istr), m_Own(deleteInStream)
707     , m_ValueType(tinfo), m_Value(tinfo), m_HasReader(false)
708     , m_EndOfData(m_Istr->EndOfData()), m_Params(params)
709 {
710     ETypeFamily type = tinfo->GetTypeFamily();
711     if (type != eTypeFamilyClass && type != eTypeFamilyChoice && type != eTypeFamilyContainer) {
712         m_Params.m_FnFilter = nullptr;
713     }
714     m_FilterType = eNone;
715     if (m_Params.m_FnFilter) {
716         if (type == eTypeFamilyClass) {
717             const CClassTypeInfo* cinfo = CTypeConverter<CClassTypeInfo>::SafeCast(tinfo);
718             if (cinfo->Implicit()) {
719                 const CItemInfo* itemInfo =
720                     cinfo->GetItems().GetItemInfo(cinfo->GetItems().FirstIndex());
721                 if (itemInfo->GetTypeInfo()->GetTypeFamily() == eTypeFamilyContainer) {
722                     m_FilterType = m_Params.m_Index != kInvalidMember ? eOneContainer : eAllContainer;
723                 }
724             }
725             if (m_FilterType == eNone) {
726                 bool is_random = cinfo->RandomOrder();
727                 if (m_Params.m_Index != kInvalidMember) {
728                     m_FilterType = is_random ? eOneRandom : eOneSeq;
729                 } else {
730                     m_FilterType = is_random ? eAllRandom : eAllSeq;
731                 }
732             }
733         } else if (type == eTypeFamilyChoice) {
734             m_FilterType = m_Params.m_Index != kInvalidMember ? eOneChoice : eAllChoice;
735         } else if (type == eTypeFamilyContainer) {
736             m_FilterType = m_Params.m_Index != kInvalidMember ? eOneContainer : eAllContainer;
737         }
738     }
739 }
740 
741 template<typename TRoot>
~CData(void)742 CObjectIStreamIterator<TRoot>::CData::~CData(void) {
743     if (m_Reader.joinable()) {
744         m_EndOfData = true;
745         m_ReaderCv.notify_all();
746         m_Reader.join();
747     }
748     if (m_Istr && m_Own == eTakeOwnership) {
749         delete m_Istr;
750     }
751 }
752 
753 template<typename TRoot>
754 void
x_BeginRead(void)755 CObjectIStreamIterator<TRoot>::CData::x_BeginRead(void) {
756     unique_lock<mutex> lck( m_ReaderMutex);
757     while (m_Value.GetObjectPtr() != nullptr) {
758         m_ReaderCv.wait(lck);
759     }
760 }
761 
762 template<typename TRoot>
763 void
x_EndRead(void)764 CObjectIStreamIterator<TRoot>::CData::x_EndRead(void) {
765     m_Value = CObjectInfo();
766     m_EndOfData = true;
767     m_ReaderCv.notify_one();
768 }
769 
770 template<typename TRoot>
771 void
x_ReaderThread()772 CObjectIStreamIterator<TRoot>::x_ReaderThread() {
773     m_Data->x_BeginRead();
774     try {
775         m_Data->m_ValueType.SetLocalSkipHook(*(m_Data->m_Istr), new typename CData::template x_CObjectIStreamIteratorHook<TRoot>(m_Data.get()));
776         while (Serial_FilterSkip(*(m_Data->m_Istr),m_Data->m_ValueType))
777             ;
778     } catch (...) {
779         if (!m_Data->m_EndOfData) {
780             m_Data->m_ReaderExpt = current_exception();
781         }
782     }
783     m_Data->x_EndRead();
784 }
785 
786 template<typename TRoot, typename TChild>
787 void
x_ReaderThread()788 CObjectIStreamIterator<TRoot,TChild>::x_ReaderThread() {
789     this->m_Data->x_BeginRead();
790     try {
791         this->m_Data->m_ValueType.SetLocalSkipHook(*(this->m_Data->m_Istr), new typename CParent::CData::template x_CObjectIStreamIteratorHook<TChild>(this->m_Data.get()));
792         this->m_Data->m_ValueType.SetLocalReadHook(*(this->m_Data->m_Istr), new x_CObjectIStreamIteratorReadHook<TChild>(this->m_Data.get()));
793         while (Serial_FilterSkip(*(this->m_Data->m_Istr),CObjectTypeInfo(CType<TRoot>())))
794             ;
795     } catch (...) {
796         if (!this->m_Data->m_EndOfData) {
797             this->m_Data->m_ReaderExpt = current_exception();
798         }
799     }
800     this->m_Data->x_EndRead();
801 }
802 
803 template<typename TRoot>
804 void
x_AcceptData(CObjectIStream & in,const CObjectInfo & objinfo)805 CObjectIStreamIterator<TRoot>::CData::x_AcceptData(
806     CObjectIStream& in, const CObjectInfo& objinfo)
807 {
808     if (m_Istr->EndOfData()) {
809         m_EndOfData = true;
810     } else {
811         bool res = false;
812         switch ( m_FilterType) {
813         default:
814         case eNone:
815             res = x_NextNoFilter(objinfo);
816             break;
817         case eOneSeq:
818         case eOneRandom:
819         case eAllSeq:
820         case eAllRandom:
821             res = x_NextSeqWithFilter(objinfo);
822             break;
823         case eOneChoice:
824         case eAllChoice:
825             res = x_NextChoiceWithFilter(objinfo);
826             break;
827         case eOneContainer:
828         case eAllContainer:
829             res = x_NextContainerWithFilter(objinfo);
830             break;
831         }
832         if (res) {
833             unique_lock<mutex> lck(m_ReaderMutex);
834             m_Value = objinfo;
835             m_ReaderCv.notify_one();
836             while (m_Value.GetObjectPtr() != nullptr) {
837                 if (m_EndOfData) {
838                     NCBI_THROW( CEofException, eEof,
839                         "CObjectIStreamIterator: abort data parsing");
840                 }
841                 m_ReaderCv.wait(lck);
842             }
843         } else {
844             in.SetDiscardCurrObject();
845         }
846     }
847 }
848 
849 template<typename TRoot>
850 void
x_Next(void)851 CObjectIStreamIterator<TRoot>::CData::x_Next(void) {
852     unique_lock<mutex> lck(m_ReaderMutex);
853     m_Value = CObjectInfo();
854     m_ReaderCv.notify_one();
855     while (m_Value.GetObjectPtr() == nullptr && !m_EndOfData) {
856         m_ReaderCv.wait(lck);
857     }
858     if (m_ReaderExpt) {
859         rethrow_exception(m_ReaderExpt);
860     }
861 }
862 
863 template<typename TRoot>
864 bool
x_NextNoFilter(const CObjectInfo & objinfo)865 CObjectIStreamIterator<TRoot>::CData::x_NextNoFilter(const CObjectInfo& objinfo)
866 {
867     objinfo.GetTypeInfo()->DefaultReadData(*m_Istr, objinfo.GetObjectPtr());
868     return true;
869 }
870 
871 template<typename TRoot>
872 bool
x_NextSeqWithFilter(const CObjectInfo & objinfo)873 CObjectIStreamIterator<TRoot>::CData::x_NextSeqWithFilter(const CObjectInfo& objinfo)
874 {
875     TMemberIndex mi = kInvalidMember;
876     set<TMemberIndex> done;
877     bool checked = false;
878     bool valid = true;
879     TRoot& obj = *CTypeConverter<TRoot>::SafeCast(objinfo.GetObjectPtr());
880 
881     for ( CIStreamClassMemberIterator i(*m_Istr, objinfo); i; ++i ) {
882 
883         TMemberIndex mi_now = (*i).GetMemberIndex();
884         CObjectInfoMI minfo(objinfo, mi_now);
885 
886         if (valid) {
887 // before read - validate missing members
888             switch (m_FilterType) {
889             case eOneRandom:
890             case eAllRandom:
891             default:
892                 break;
893             case eOneSeq:
894                 if (mi_now > m_Params.m_Index && !checked) {
895                     valid = m_Params.m_FnFilter( *m_Istr, obj, m_Params.m_Index, nullptr, m_Params.m_Extra);
896                     checked = true;
897                 }
898                 break;
899             case eAllSeq:
900                 for (++mi; valid && mi < mi_now; ++mi) {
901                     valid = m_Params.m_FnFilter( *m_Istr, obj, mi, nullptr, m_Params.m_Extra);
902                 }
903                 break;
904             }
905         }
906 
907 // if still valid
908         if (valid) {
909 // read next member
910             i.ReadClassMember(objinfo);
911 
912 // after read - validate member
913             switch (m_FilterType) {
914             default: break;
915             case eOneSeq:
916             case eOneRandom:
917                 if (mi_now == m_Params.m_Index) {
918                     CObjectInfo oi = minfo.GetMember().GetTypeFamily() == eTypeFamilyPointer ?
919                         minfo.GetMember().GetPointedObject() : minfo.GetMember();
920                     valid = m_Params.m_FnFilter( *m_Istr, obj, mi_now, &oi, m_Params.m_Extra);
921                     checked = true;
922                 }
923                 break;
924             case eAllRandom:
925                 done.insert(mi_now);
926                 // no break
927                 NCBI_FALLTHROUGH;
928             case eAllSeq:
929                 {
930                     CObjectInfo oi = minfo.GetMember().GetTypeFamily() == eTypeFamilyPointer ?
931                         minfo.GetMember().GetPointedObject() : minfo.GetMember();
932                     valid = m_Params.m_FnFilter( *m_Istr, obj, mi_now, &oi, m_Params.m_Extra);
933                 }
934                 break;
935             }
936         } else {
937 // object invalid - skip remaining members
938             i.SkipClassMember();
939         }
940         mi = mi_now;
941     }
942 
943 // finally - validate missing members
944     if (valid) {
945         switch (m_FilterType) {
946         default: break;
947         case eOneSeq:
948         case eOneRandom:
949             if (!checked) {
950                 valid = m_Params.m_FnFilter( *m_Istr, obj, m_Params.m_Index, nullptr, m_Params.m_Extra);
951             }
952             break;
953         case eAllSeq:
954             {
955                 TMemberIndex mi_last = objinfo.GetClassTypeInfo()->GetItems().LastIndex() + 1;
956                 for (++mi; valid && mi < mi_last; ++mi) {
957                     valid = m_Params.m_FnFilter( *m_Istr, obj, mi, nullptr, m_Params.m_Extra);
958                 }
959             }
960             break;
961         case eAllRandom:
962             {
963                 mi = objinfo.GetClassTypeInfo()->GetItems().FirstIndex();
964                 TMemberIndex mi_last = objinfo.GetClassTypeInfo()->GetItems().LastIndex() + 1;
965                 for (; valid && mi < mi_last; ++mi) {
966                     if (done.find(mi) == done.end()) {
967                         valid = m_Params.m_FnFilter( *m_Istr, obj, mi, nullptr, m_Params.m_Extra);
968                     }
969                 }
970             }
971             break;
972         }
973     }
974     return valid;
975 }
976 
977 template<typename TRoot>
978 bool
x_NextChoiceWithFilter(const CObjectInfo & objinfo)979 CObjectIStreamIterator<TRoot>::CData::x_NextChoiceWithFilter(const CObjectInfo& objinfo)
980 {
981     bool valid = true;
982     objinfo.GetTypeInfo()->DefaultReadData(*m_Istr, objinfo.GetObjectPtr());
983     TRoot& obj = *CTypeConverter<TRoot>::SafeCast(objinfo.GetObjectPtr());
984     CObjectInfoCV cv = objinfo.GetCurrentChoiceVariant();
985     TMemberIndex i = cv.GetVariantIndex();
986     if (i == m_Params.m_Index) {
987         CObjectInfo oi = cv.GetVariant().GetTypeFamily() == eTypeFamilyPointer ?
988             cv.GetVariant().GetPointedObject() : cv.GetVariant();
989         valid = m_Params.m_FnFilter( *m_Istr, obj, i, &oi, m_Params.m_Extra);
990     } else {
991         valid = m_Params.m_FnFilter( *m_Istr, obj, m_Params.m_Index, nullptr, m_Params.m_Extra);
992     }
993     return valid;
994 }
995 
996 template<typename TRoot>
997 bool
x_NextContainerWithFilter(const CObjectInfo & objinfo)998 CObjectIStreamIterator<TRoot>::CData::x_NextContainerWithFilter(const CObjectInfo& objinfo)
999 {
1000     TMemberIndex mi = kInvalidMember;
1001     bool valid = true;
1002     TRoot& obj = *CTypeConverter<TRoot>::SafeCast(objinfo.GetObjectPtr());
1003 
1004     for ( CIStreamContainerIterator i(*m_Istr, objinfo); i; ++i ) {
1005         if (valid) {
1006             CObjectInfo oi(i.ReadElement(objinfo.GetObjectPtr()));
1007             ++mi;
1008             if (oi.GetObjectPtr()) {
1009                 if (m_FilterType == eAllContainer || (m_FilterType == eOneContainer && mi == m_Params.m_Index)) {
1010                     CObjectInfo oe = oi.GetTypeFamily() == eTypeFamilyPointer ? oi.GetPointedObject() : oi;
1011                     valid = m_Params.m_FnFilter( *m_Istr, obj, mi, &oe, m_Params.m_Extra);
1012                 }
1013             }
1014         } else {
1015             i.SkipElement();
1016         }
1017     }
1018     return valid;
1019 }
1020 
1021 template<typename TRoot>
1022 CObjectIStreamIterator<TRoot>&
operator ++(void)1023 CObjectIStreamIterator<TRoot>::operator++(void) {
1024     if (m_Data.get() != nullptr) {
1025         if (!m_Data->m_HasReader) {
1026             if (m_Data->m_Istr->EndOfData()) {
1027                 m_Data.reset();
1028             } else {
1029                 m_Data->m_Value = CObjectInfo(m_Data->m_ValueType);
1030                 m_Data->m_Istr->Read(m_Data->m_Value);
1031             }
1032         } else {
1033             m_Data->x_Next();
1034             if (m_Data->m_EndOfData) {
1035                 m_Data.reset();
1036             }
1037         }
1038     }
1039     return *this;
1040 }
1041 
1042 template<typename TRoot>
1043 bool
operator ==(const CObjectIStreamIterator & v) const1044 CObjectIStreamIterator<TRoot>::operator==(
1045     const CObjectIStreamIterator& v) const {
1046     return m_Data.get() == v.m_Data.get();
1047 }
1048 
1049 template<typename TRoot>
1050 bool
operator !=(const CObjectIStreamIterator & v) const1051 CObjectIStreamIterator<TRoot>::operator!=(
1052     const CObjectIStreamIterator& v) const {
1053     return m_Data.get() != v.m_Data.get();
1054 }
1055 
1056 template<typename TRoot>
IsValid() const1057 bool CObjectIStreamIterator<TRoot>::IsValid() const {
1058     return m_Data.get() != nullptr && m_Data->m_Value.GetObjectPtr() != nullptr;
1059 }
1060 
1061 template<typename TRoot>
1062 const CObjectIStream&
GetObjectIStream(void) const1063 CObjectIStreamIterator<TRoot>::GetObjectIStream(void) const {
1064     return *(m_Data->m_Istr);
1065 }
1066 
1067 template<typename TRoot>
1068 TRoot&
operator *()1069 CObjectIStreamIterator<TRoot>::operator*() {
1070     return *(TRoot*)(m_Data->m_Value.GetObjectPtr());
1071 }
1072 
1073 template<typename TRoot>
1074 TRoot*
operator ->()1075 CObjectIStreamIterator<TRoot>::operator->() {
1076     return IsValid() ? (TRoot*)m_Data->m_Value.GetObjectPtr() : nullptr;
1077 }
1078 
1079 template<typename TRoot>
1080 CObjectIStreamIterator<TRoot>&
begin(void)1081 CObjectIStreamIterator<TRoot>::begin(void) {
1082     return *this;
1083 }
1084 
1085 template<typename TRoot>
1086 CObjectIStreamIterator<TRoot>
end(void)1087 CObjectIStreamIterator<TRoot>::end(void) {
1088     return CObjectIStreamIterator<TRoot>();
1089 }
1090 
1091 
1092 /////////////////////////////////////////////////////////////////////////////
1093 ///  CObjectIStreamIterator<TRoot, TChild> implementation
1094 
1095 template<typename TRoot, typename TChild>
CObjectIStreamIterator(void)1096 CObjectIStreamIterator<TRoot, TChild>::CObjectIStreamIterator(void)
1097     : CParent() {
1098 }
1099 
1100 template<typename TRoot, typename TChild>
CObjectIStreamIterator(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params)1101 CObjectIStreamIterator<TRoot, TChild>::CObjectIStreamIterator(
1102     CObjectIStream& istr, EOwnership deleteInStream, const CParams& params)
1103     : CParent(istr, params, deleteInStream)
1104 {
1105     if (!this->m_Data->m_EndOfData) {
1106         this->m_Data->m_HasReader = true;
1107         this->m_Data->m_Reader = thread( mem_fun<void, CObjectIStreamIterator<TRoot,TChild> >(
1108             &CObjectIStreamIterator<TRoot,TChild>::x_ReaderThread), this);
1109     }
1110     ++(*this);
1111 }
1112 
1113 template<typename TRoot, typename TChild>
CObjectIStreamIterator(const CObjectIStreamIterator & v)1114 CObjectIStreamIterator<TRoot, TChild>::CObjectIStreamIterator(
1115     const CObjectIStreamIterator& v) : CParent(v) {
1116 }
1117 
1118 template<typename TRoot, typename TChild>
1119 CObjectIStreamIterator<TRoot, TChild>&
operator =(const CObjectIStreamIterator & v)1120 CObjectIStreamIterator<TRoot, TChild>::operator=(
1121     const CObjectIStreamIterator& v) {
1122     CParent::operator=(v);
1123     return *this;
1124 }
1125 
1126 template<typename TRoot, typename TChild>
~CObjectIStreamIterator()1127 CObjectIStreamIterator<TRoot, TChild>::~CObjectIStreamIterator() {
1128 }
1129 
1130 template<typename TRoot, typename TChild>
1131 CObjectIStreamIterator<TRoot, TChild>&
operator ++(void)1132 CObjectIStreamIterator<TRoot, TChild>::operator++(void) {
1133     CParent::operator++();
1134     return *this;
1135 }
1136 
1137 template<typename TRoot, typename TChild>
1138 CObjectIStreamIterator<TRoot, TChild>&
begin(void)1139 CObjectIStreamIterator<TRoot, TChild>::begin(void) {
1140     return *this;
1141 }
1142 
1143 template<typename TRoot, typename TChild>
1144 CObjectIStreamIterator<TRoot, TChild>
end(void)1145 CObjectIStreamIterator<TRoot, TChild>::end(void) {
1146     return CObjectIStreamIterator<TRoot, TChild>();
1147 }
1148 
1149 
1150 /////////////////////////////////////////////////////////////////////////////
1151 ///   CObjectIStreamAsyncIterator<TRoot>
1152 
1153 template<typename TRoot>
1154 class CObjectIStreamAsyncIterator<TRoot>
1155     : public iterator< input_iterator_tag, TRoot, ptrdiff_t, TRoot*, TRoot& >
1156 {
1157 public:
1158     using CParams = CObjectIStreamAsyncIterator<>::CParams<TRoot>;
1159 
1160     CObjectIStreamAsyncIterator( CObjectIStream& istr,
1161                                  EOwnership deleteInStream = eNoOwnership,
1162                                  const CParams& params     =  CParams());
1163     CObjectIStreamAsyncIterator(void);
1164     CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator&);
1165     CObjectIStreamAsyncIterator& operator=(const CObjectIStreamAsyncIterator&);
1166     ~CObjectIStreamAsyncIterator();
1167 
1168     CObjectIStreamAsyncIterator& operator++(void);
1169     bool operator==(const CObjectIStreamAsyncIterator&) const;
1170     bool operator!=(const CObjectIStreamAsyncIterator&) const;
1171     bool IsValid(void) const;
1172 
1173     TRoot& operator*();
1174     TRoot* operator->();
1175 
1176     CObjectIStreamAsyncIterator& begin(void);
1177     CObjectIStreamAsyncIterator  end(void);
1178 
1179 
1180 protected:
1181     typedef queue< CRef<TRoot> > TObjectsQueue;
1182 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1183     typedef function<TObjectsQueue(CRef<CByteSource>, ESerialDataFormat, const CParams&, TObjectsQueue)> FParserFunction;
1184 #else
1185     typedef function<TObjectsQueue(CRef<CByteSource>, ESerialDataFormat, const CParams&, TObjectsQueue&&)> FParserFunction;
1186 #endif
1187     CObjectIStreamAsyncIterator( CObjectIStream& istr,
1188                                  EOwnership deleteInStream,
1189                                  FParserFunction parser,
1190                                  const CParams& params);
1191 private:
1192     static TObjectsQueue sx_ClearGarbageAndParse(
1193             CRef<CByteSource> bytesource,  ESerialDataFormat format,
1194             const CParams& params,
1195 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1196             TObjectsQueue garbage
1197 #else
1198             TObjectsQueue&& garbage
1199 #endif
1200             );
1201 
1202     struct CData {
1203         CData(CObjectIStream& istr, EOwnership deleteInStream, FParserFunction parser,
1204             const CParams& params);
1205         ~CData(void);
1206 
1207         using future_queue_t  = future<TObjectsQueue>;
1208         using futures_queue_t = queue<future_queue_t>;
1209 
1210         void x_UpdateObjectsQueue();
1211         void x_UpdateFuturesQueue();
1212         CRef< CByteSource > x_GetNextData(void);
1213         void x_ReaderThread(void);
1214 
1215         TObjectsQueue m_ObjectsQueue; // current queue of objects
1216         TObjectsQueue m_GarbageQueue; // popped so-far from objects-queue
1217         futures_queue_t m_FuturesQueue; // queue-of-futures-of-object-queues
1218 
1219         CObjectIStream* m_Istr;
1220         EOwnership      m_Own;
1221         FParserFunction m_Parser;
1222         size_t          m_ParserCount;
1223         size_t          m_RawBufferSize;
1224         size_t          m_MaxRawSize;
1225         size_t          m_CurrentRawSize;
1226         launch          m_Policy;
1227         bool            m_EndOfData;
1228         CParams         m_Params;
1229 
1230         mutex                        m_ReaderMutex;
1231         condition_variable           m_ReaderCv;
1232         thread                       m_Reader;
1233         queue< CRef< CByteSource > > m_ReaderData;
1234         queue< size_t >              m_ReaderDataSize;
1235     };
1236     shared_ptr<CData> m_Data;
1237 };
1238 
1239 
1240 /////////////////////////////////////////////////////////////////////////////
1241 ///  CObjectIStreamAsyncIterator<TRoot, TChild>
1242 
1243 template<typename TRoot, typename TChild>
1244 class CObjectIStreamAsyncIterator<TRoot, TChild>
1245     : public CObjectIStreamAsyncIterator<TChild>
1246 {
1247 public:
1248     using CParams = CObjectIStreamAsyncIterator<>::CParams<TChild>;
1249 
1250     CObjectIStreamAsyncIterator( CObjectIStream& istr,
1251                                  EOwnership deleteInStream = eNoOwnership,
1252                                  const CParams& params     = CParams());
1253     CObjectIStreamAsyncIterator(void);
1254     CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator&);
1255     CObjectIStreamAsyncIterator& operator=(const CObjectIStreamAsyncIterator&);
1256     ~CObjectIStreamAsyncIterator();
1257 
1258     CObjectIStreamAsyncIterator& operator++(void);
1259     CObjectIStreamAsyncIterator& begin(void);
1260     CObjectIStreamAsyncIterator  end(void);
1261 
1262 
1263 private:
1264     using CParent       = CObjectIStreamAsyncIterator<TChild>;
1265     using TObjectsQueue = typename CParent::TObjectsQueue;
1266 
1267     static TObjectsQueue sx_ClearGarbageAndParse(
1268             CRef<CByteSource> bytesource,  ESerialDataFormat format,
1269             const CParams& params,
1270 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1271             TObjectsQueue garbage
1272 #else
1273             TObjectsQueue&& garbage
1274 #endif
1275             );
1276 };
1277 
1278 /////////////////////////////////////////////////////////////////////////////
1279 ///  CObjectIStreamAsyncIterator<TRoot> implementation
1280 
1281 template<typename TRoot>
CObjectIStreamAsyncIterator(void)1282 CObjectIStreamAsyncIterator<TRoot>::CObjectIStreamAsyncIterator(void)
1283     : m_Data(nullptr)
1284 {
1285 }
1286 
1287 template<typename TRoot>
CObjectIStreamAsyncIterator(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params)1288 CObjectIStreamAsyncIterator<TRoot>::CObjectIStreamAsyncIterator(
1289         CObjectIStream& istr, EOwnership deleteInStream,
1290         const CParams& params)
1291     : CObjectIStreamAsyncIterator( istr, deleteInStream,
1292         &CObjectIStreamAsyncIterator<TRoot>::sx_ClearGarbageAndParse, params)
1293 {
1294 }
1295 
1296 template<typename TRoot>
CObjectIStreamAsyncIterator(CObjectIStream & istr,EOwnership deleteInStream,FParserFunction parser,const CParams & params)1297 CObjectIStreamAsyncIterator<TRoot>::CObjectIStreamAsyncIterator(
1298     CObjectIStream& istr,  EOwnership deleteInStream,
1299     FParserFunction parser,  const CParams& params)
1300     : m_Data(new CData(istr, deleteInStream, parser, params))
1301 {
1302     ++(*this);
1303 }
1304 
1305 template<typename TRoot>
CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator & v)1306 CObjectIStreamAsyncIterator<TRoot>::CObjectIStreamAsyncIterator(
1307     const CObjectIStreamAsyncIterator& v)
1308         : m_Data(v.m_Data)
1309 {
1310 }
1311 
1312 template<typename TRoot>
1313 CObjectIStreamAsyncIterator<TRoot>&
operator =(const CObjectIStreamAsyncIterator & v)1314 CObjectIStreamAsyncIterator<TRoot>::operator=(
1315     const CObjectIStreamAsyncIterator& v) {
1316     m_Data = v.m_Data;
1317     return *this;
1318 }
1319 
1320 template<typename TRoot>
~CObjectIStreamAsyncIterator()1321 CObjectIStreamAsyncIterator<TRoot>::~CObjectIStreamAsyncIterator() {
1322 }
1323 
1324 template<typename TRoot>
1325 CObjectIStreamAsyncIterator<TRoot>&
operator ++()1326 CObjectIStreamAsyncIterator<TRoot>::operator++() {
1327     if (m_Data.get() != nullptr) {
1328         do {
1329             m_Data->x_UpdateFuturesQueue();
1330             m_Data->x_UpdateObjectsQueue();
1331         } while (!IsValid() && !m_Data->m_EndOfData);
1332         if (!IsValid()) {
1333             m_Data.reset();
1334         }
1335     }
1336     return *this;
1337 }
1338 
1339 template<typename TRoot>
1340 bool
operator ==(const CObjectIStreamAsyncIterator & v) const1341 CObjectIStreamAsyncIterator<TRoot>::operator==(
1342     const CObjectIStreamAsyncIterator& v) const {
1343     return m_Data.get() == v.m_Data.get();
1344 }
1345 
1346 template<typename TRoot>
1347 bool
operator !=(const CObjectIStreamAsyncIterator & v) const1348 CObjectIStreamAsyncIterator<TRoot>::operator!=(
1349     const CObjectIStreamAsyncIterator& v) const {
1350     return m_Data.get() != v.m_Data.get();
1351 }
1352 
1353 template<typename TRoot>
IsValid() const1354 bool CObjectIStreamAsyncIterator<TRoot>::IsValid() const {
1355     return m_Data.get() != nullptr && !m_Data->m_ObjectsQueue.empty();
1356 }
1357 
1358 template<typename TRoot>
1359 TRoot&
operator *()1360 CObjectIStreamAsyncIterator<TRoot>::operator*() {
1361     return m_Data->m_ObjectsQueue.front().GetObject();
1362 }
1363 
1364 template<typename TRoot>
1365 TRoot*
operator ->()1366 CObjectIStreamAsyncIterator<TRoot>::operator->() {
1367     return IsValid() ? m_Data->m_ObjectsQueue.front().GetPointer() : nullptr;
1368 }
1369 
1370 template<typename TRoot>
1371 CObjectIStreamAsyncIterator<TRoot>&
begin(void)1372 CObjectIStreamAsyncIterator<TRoot>::begin(void) {
1373     return *this;
1374 }
1375 
1376 template<typename TRoot>
1377 CObjectIStreamAsyncIterator<TRoot>
end(void)1378 CObjectIStreamAsyncIterator<TRoot>::end(void) {
1379     return CObjectIStreamAsyncIterator<TRoot>();
1380 }
1381 
1382 
1383 template<typename TRoot>
1384 typename CObjectIStreamAsyncIterator<TRoot>::TObjectsQueue
sx_ClearGarbageAndParse(CRef<CByteSource> bytesource,ESerialDataFormat format,const CParams & params,TObjectsQueue garbage)1385 CObjectIStreamAsyncIterator<TRoot>::sx_ClearGarbageAndParse(
1386         CRef<CByteSource> bytesource,
1387         ESerialDataFormat format,
1388         const CParams& params,
1389 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1390         TObjectsQueue garbage
1391 #else
1392         TObjectsQueue&& garbage
1393 #endif
1394         )
1395 {
1396     {{
1397         TObjectsQueue dummy;
1398         swap(garbage, dummy);
1399         // garbage now gets destroyed, if last-reference
1400     }}
1401 
1402     // deserialize objects from bytesource
1403     unique_ptr<CObjectIStream> istr { CObjectIStream::Create(format, *bytesource) };
1404 
1405     TObjectsQueue queue;
1406     if (params.m_FnFilter) {
1407         for (TRoot& object : CObjectIStreamIterator<TRoot>( *istr, eNoOwnership, params)) {
1408             queue.push( CRef<TRoot>(&object));
1409         }
1410     } else {
1411         while(!istr->EndOfData()) {
1412             CRef<TRoot> object(new TRoot);
1413             istr->Read(&*object, object->GetThisTypeInfo());
1414             queue.push(object);
1415         }
1416     }
1417     return queue;
1418 }
1419 
1420 template<typename TRoot>
CData(CObjectIStream & istr,EOwnership deleteInStream,FParserFunction parser,const CParams & params)1421 CObjectIStreamAsyncIterator<TRoot>::CData::CData(
1422         CObjectIStream& istr, EOwnership deleteInStream, FParserFunction parser,
1423         const CParams& params)
1424     : m_Istr(&istr)
1425     , m_Own(deleteInStream)
1426     , m_Parser(parser)
1427     , m_ParserCount(   params.m_MaxParserThreads != 0 ? params.m_MaxParserThreads : 16)
1428     , m_RawBufferSize( params.m_MinRawBufferSize)
1429     , m_MaxRawSize(    params.m_SameThread ? 0 : params.m_MaxTotalRawSize)
1430     , m_CurrentRawSize(0)
1431     , m_Policy(params.m_ThreadPolicy)
1432     , m_EndOfData(m_Istr->EndOfData())
1433     , m_Params(params)
1434 {
1435     if (m_MaxRawSize != 0 && !m_EndOfData) {
1436         m_Reader = thread(
1437             mem_fun<void, CObjectIStreamAsyncIterator<TRoot>::CData >(
1438                 &CObjectIStreamAsyncIterator<TRoot>::CData::x_ReaderThread), this);
1439     }
1440 }
1441 
1442 template<typename TRoot>
~CData()1443 CObjectIStreamAsyncIterator<TRoot>::CData::~CData() {
1444     if (m_Reader.joinable()) {
1445         m_EndOfData = true;
1446         m_ReaderCv.notify_all();
1447         m_Reader.join();
1448     }
1449     if (m_Istr && m_Own == eTakeOwnership) {
1450         delete m_Istr;
1451     }
1452 }
1453 
1454 // m_GarbageQueue processing:
1455 //
1456 // When the current object (the one returned by operator*())
1457 // goes out of scope, if it is the last reference, the
1458 // destructor of the object will be called from main
1459 // thread, which is an expensive operation, which
1460 // we want to offload to a different thread - "here are some
1461 // objects - just let them go out of scope"
1462 //
1463 // So before calling m_ObjectsQueue pop we'll save the
1464 // current object in the garbage-queue to prevent it from being
1465 // destructed at this time, and will pass the garbage
1466 // queue to sx_ClearGarbageAndParse (executed asynchrously),
1467 // where the destructors of the garbage-objects will be called
1468 // (as apporpriate, as determined by CRefs going out of scope)
1469 
1470 template<typename TRoot>
1471 void
x_UpdateObjectsQueue()1472 CObjectIStreamAsyncIterator<TRoot>::CData::x_UpdateObjectsQueue()
1473 {
1474     // bring the next objects up front; save the garbage
1475     if(!m_ObjectsQueue.empty()) {
1476         m_GarbageQueue.push( m_ObjectsQueue.front());
1477         m_ObjectsQueue.pop();
1478     }
1479 
1480     // unpack the next objects-queue from futures-queue if empty
1481     if(    m_ObjectsQueue.empty()
1482         && !m_FuturesQueue.empty())
1483     {
1484         m_ObjectsQueue = m_FuturesQueue.front().get();
1485         m_FuturesQueue.pop();
1486     }
1487 }
1488 
1489 template<typename TRoot>
1490 void
x_UpdateFuturesQueue()1491 CObjectIStreamAsyncIterator<TRoot>::CData::x_UpdateFuturesQueue()
1492 {
1493     // nothing to deserialize, or already full
1494     if( m_FuturesQueue.size() >= m_ParserCount) {
1495         return;
1496     }
1497     if (m_EndOfData ||
1498         // no raw data ready yet, but we still have work to do
1499         (m_MaxRawSize != 0 && m_ReaderData.empty() && !m_FuturesQueue.empty())) {
1500         return;
1501     }
1502     CRef< CByteSource > data = x_GetNextData();
1503     if (data.IsNull()) {
1504         m_EndOfData = true;
1505         return;
1506     }
1507 
1508 #if 0
1509     // for reference / profiling: clearing the garbage-queue
1510     // from this thread will make the processing considerably slower.
1511     // Instead, we'll pass the garbage to the async call below.
1512     if(false) {
1513             TObjectsQueue dummy;
1514             swap(m_GarbageQueue, dummy);
1515     }
1516 #endif
1517 
1518     // launch async task to deserialize objects
1519     // from the skipped bytes in delay-buffer, and
1520     // also pass the garbage queue for destruction
1521     // of contents.
1522     // note: we can't move m_GarbageQueue directly
1523     // as it lacks ::clear() method that could restore
1524     // it to a valid empty state after move.
1525 
1526     TObjectsQueue tmp_garbage_queue;
1527     swap(m_GarbageQueue, tmp_garbage_queue);
1528 
1529     m_FuturesQueue.push( async( m_Policy, m_Parser,
1530         data,  m_Istr->GetDataFormat(), m_Params, move(tmp_garbage_queue)));
1531 }
1532 
1533 template<typename TRoot>
1534 CRef< CByteSource >
x_GetNextData(void)1535 CObjectIStreamAsyncIterator<TRoot>::CData::x_GetNextData(void)
1536 {
1537     if (m_MaxRawSize == 0) {
1538         // read raw data in this (main) thread
1539         if (m_Istr->EndOfData()) {
1540             return CRef< CByteSource >();
1541         }
1542         const CNcbiStreampos endpos =
1543             m_Istr->GetStreamPos()  + (CNcbiStreampos)(m_RawBufferSize);
1544         CStreamDelayBufferGuard guard(*m_Istr);
1545         do {
1546             m_Istr->SkipAnyContentObject();
1547         } while( !m_Istr->EndOfData() && m_Istr->GetStreamPos() < endpos);
1548         return guard.EndDelayBuffer();
1549     }
1550 
1551     // get raw data prepared by reader
1552     unique_lock<mutex> lck(m_ReaderMutex);
1553     while (m_ReaderData.empty()) {
1554         m_ReaderCv.wait(lck);
1555     }
1556     CRef< CByteSource > data = m_ReaderData.front();
1557     m_ReaderData.pop();
1558     m_CurrentRawSize -= m_ReaderDataSize.front();
1559     m_ReaderDataSize.pop();
1560     m_ReaderCv.notify_one();
1561     return data;
1562 }
1563 
1564 template<typename TRoot>
1565 void
x_ReaderThread(void)1566 CObjectIStreamAsyncIterator<TRoot>::CData::x_ReaderThread(void)
1567 {
1568     // Skip over some objects in stream without parsing, up to buffer_size.
1569     while (!m_Istr->EndOfData()) {
1570         const CNcbiStreampos startpos = m_Istr->GetStreamPos();
1571         const CNcbiStreampos endpos =
1572             startpos  + (CNcbiStreampos)(m_RawBufferSize);
1573 
1574         CStreamDelayBufferGuard guard(*(m_Istr));
1575         try {
1576             do {
1577                 m_Istr->SkipAnyContentObject();
1578             } while( !m_Istr->EndOfData() && m_Istr->GetStreamPos() < endpos);
1579         } catch (...) {
1580         }
1581 
1582         size_t this_buffer_size = m_Istr->GetStreamPos() - startpos;
1583         CRef< CByteSource > data = guard.EndDelayBuffer();
1584         {
1585             unique_lock<mutex> lck(m_ReaderMutex);
1586             // make sure we do not consume too much memory
1587             while (!m_EndOfData && m_CurrentRawSize >= m_MaxRawSize) {
1588                 m_ReaderCv.wait(lck);
1589             }
1590             if (m_EndOfData) {
1591                 break;
1592             }
1593             m_ReaderData.push( data);
1594             m_ReaderDataSize.push( this_buffer_size);
1595             m_CurrentRawSize += this_buffer_size;
1596             m_ReaderCv.notify_one();
1597         }
1598     }
1599     CRef< CByteSource > data;
1600     m_ReaderMutex.lock();
1601     m_ReaderData.push( data);
1602     m_ReaderDataSize.push(0);
1603     m_ReaderMutex.unlock();
1604     m_ReaderCv.notify_one();
1605 }
1606 
1607 
1608 /////////////////////////////////////////////////////////////////////////////
1609 ///  CObjectIStreamAsyncIterator<TRoot,TChild> implementation
1610 
1611 template<typename TRoot, typename TChild>
CObjectIStreamAsyncIterator(void)1612 CObjectIStreamAsyncIterator<TRoot, TChild>::CObjectIStreamAsyncIterator(void)
1613     : CParent()
1614 {
1615 }
1616 
1617 template<typename TRoot, typename TChild>
CObjectIStreamAsyncIterator(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params)1618 CObjectIStreamAsyncIterator<TRoot, TChild>::CObjectIStreamAsyncIterator(
1619         CObjectIStream& istr, EOwnership deleteInStream,
1620         const CParams& params)
1621     : CParent(istr, deleteInStream,
1622         &CObjectIStreamAsyncIterator<TRoot, TChild>::sx_ClearGarbageAndParse, params)
1623 {
1624 }
1625 
1626 template<typename TRoot, typename TChild>
CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator & v)1627 CObjectIStreamAsyncIterator<TRoot, TChild>::CObjectIStreamAsyncIterator(
1628     const CObjectIStreamAsyncIterator& v)
1629         : CParent(v)
1630 {
1631 }
1632 
1633 template<typename TRoot, typename TChild>
1634 CObjectIStreamAsyncIterator<TRoot, TChild>&
operator =(const CObjectIStreamAsyncIterator & v)1635 CObjectIStreamAsyncIterator<TRoot, TChild>::operator=(
1636     const CObjectIStreamAsyncIterator& v) {
1637     CParent::operator=(v);
1638     return *this;
1639 }
1640 
1641 template<typename TRoot, typename TChild>
~CObjectIStreamAsyncIterator()1642 CObjectIStreamAsyncIterator<TRoot, TChild>::~CObjectIStreamAsyncIterator() {
1643 }
1644 
1645 template<typename TRoot, typename TChild>
1646 CObjectIStreamAsyncIterator<TRoot, TChild>&
operator ++()1647 CObjectIStreamAsyncIterator<TRoot, TChild>::operator++() {
1648     CParent::operator++();
1649     return *this;
1650 }
1651 
1652 template<typename TRoot, typename TChild>
1653 CObjectIStreamAsyncIterator<TRoot, TChild>&
begin(void)1654 CObjectIStreamAsyncIterator<TRoot, TChild>::begin(void) {
1655     return *this;
1656 }
1657 
1658 template<typename TRoot, typename TChild>
1659 CObjectIStreamAsyncIterator<TRoot, TChild>
end(void)1660 CObjectIStreamAsyncIterator<TRoot, TChild>::end(void) {
1661     return CObjectIStreamAsyncIterator<TRoot, TChild>();
1662 }
1663 
1664 template<typename TRoot, typename TChild>
1665 typename CObjectIStreamAsyncIterator<TRoot, TChild>::TObjectsQueue
sx_ClearGarbageAndParse(CRef<CByteSource> bytesource,ESerialDataFormat format,const CParams & params,TObjectsQueue garbage)1666 CObjectIStreamAsyncIterator<TRoot, TChild>::sx_ClearGarbageAndParse(
1667         CRef<CByteSource> bytesource,
1668         ESerialDataFormat format,
1669         const CParams& params,
1670 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1671         TObjectsQueue garbage
1672 #else
1673         TObjectsQueue&& garbage
1674 #endif
1675         )
1676 {
1677     {{
1678         TObjectsQueue dummy;
1679         swap(garbage, dummy);
1680         // garbage now gets destroyed, if last-reference
1681     }}
1682 
1683     // deserialize objects from bytesource
1684     unique_ptr<CObjectIStream> istr { CObjectIStream::Create(format, *bytesource) };
1685     TObjectsQueue queue;
1686     for (TChild& object : CObjectIStreamIterator<TRoot, TChild>( *istr, eNoOwnership, params)) {
1687         queue.push( CRef<TChild>(&object));
1688     }
1689     return queue;
1690 }
1691 
1692 
1693 
1694 
1695 /////////////////////////////////////////////////////////////////////////////
1696 /////////////////////////////////////////////////////////////////////////////
1697 // Iterate over objects in input stream
1698 //
1699 // IMPORTANT: the following API requires multi-threading
1700 //
1701 // IMPORTANT: this API is deprecated, use CObjectIStreamIterator instead (defined above)
1702 
1703 
1704 template<typename TRoot, typename TObject>
1705 class CIStreamIteratorThread_Base;
1706 
1707 // Helper hook class
1708 template<typename TRoot, typename TObject>
1709 class CIStreamObjectHook : public CSerial_FilterObjectsHook<TObject>
1710 {
1711 public:
CIStreamObjectHook(CIStreamIteratorThread_Base<TRoot,TObject> & thr)1712     CIStreamObjectHook(CIStreamIteratorThread_Base<TRoot,TObject>& thr)
1713         : m_Reader(thr)
1714     {
1715     }
1716     virtual void Process(const TObject& obj) override;
1717 private:
1718     CIStreamIteratorThread_Base<TRoot,TObject>& m_Reader;
1719 };
1720 
1721 // Helper thread class
1722 template<typename TRoot, typename TObject>
1723 class CIStreamIteratorThread_Base : public CThread
1724 {
1725 public:
CIStreamIteratorThread_Base(CObjectIStream & in,EOwnership deleteInStream)1726     CIStreamIteratorThread_Base(CObjectIStream& in, EOwnership deleteInStream)
1727         : m_In(in), m_Resume(0,1), m_Ready(0,1), m_Obj(0),
1728           m_Ownership(deleteInStream), m_Stop(false), m_Failed(false)
1729     {
1730     }
1731     // Resume thread, wait for the next object
Next(void)1732     void Next(void)
1733     {
1734         m_Obj = 0;
1735         if (!m_Stop && !m_In.EndOfData()) {
1736             m_Resume.Post();
1737             m_Ready.Wait();
1738             if (m_Failed) {
1739                 NCBI_THROW(CSerialException,eFail,
1740                              "invalid data object received");
1741             }
1742         }
1743     }
1744     // Request stop: thread is no longer needed
Stop(void)1745     void Stop(void)
1746     {
1747         m_Stop = true;
1748         m_Resume.Post();
1749         Join(0);
1750     }
Fail(void)1751     void Fail(void)
1752     {
1753         m_Failed = true;
1754         SetObject(0);
1755     }
1756     // Object is ready: suspend thread
SetObject(const TObject * obj)1757     void SetObject(const TObject* obj)
1758     {
1759         m_Obj = obj;
1760         m_Ready.Post();
1761         m_Resume.Wait();
1762         if (m_Stop) {
1763             Exit(0);
1764         }
1765     }
GetObject(void) const1766     const TObject* GetObject(void) const
1767     {
1768         return m_Obj;
1769     }
1770 protected:
~CIStreamIteratorThread_Base(void)1771     ~CIStreamIteratorThread_Base(void)
1772     {
1773         if (m_Ownership == eTakeOwnership) {
1774             delete &m_In;
1775         }
1776     }
Main(void)1777     virtual void* Main(void)
1778     {
1779         return 0;
1780     }
1781 protected:
1782     CObjectIStream&      m_In;
1783     CSemaphore           m_Resume;
1784     CSemaphore           m_Ready;
1785     const TObject*       m_Obj;
1786     EOwnership           m_Ownership;
1787     bool                 m_Stop;
1788     bool                 m_Failed;
1789 };
1790 
1791 // Reading thread for serial objects
1792 template<typename TRoot, typename TObject>
1793 class CIStreamObjectIteratorThread
1794     : public CIStreamIteratorThread_Base< TRoot,TObject >
1795 {
1796 public:
CIStreamObjectIteratorThread(CObjectIStream & in,EOwnership deleteInStream)1797     CIStreamObjectIteratorThread(CObjectIStream& in, EOwnership deleteInStream)
1798         : CIStreamIteratorThread_Base< TRoot,TObject >(in, deleteInStream)
1799     {
1800     }
1801 protected:
~CIStreamObjectIteratorThread(void)1802     ~CIStreamObjectIteratorThread(void)
1803     {
1804     }
Main(void)1805     virtual void* Main(void) override
1806     {
1807         this->m_Resume.Wait();
1808         // Enumerate objects of requested type
1809         try {
1810             Serial_FilterObjects< TRoot >( this->m_In,
1811                 new CIStreamObjectHook< TRoot, TObject >(*this));
1812             this->SetObject(0);
1813         } catch (CException& e) {
1814             NCBI_REPORT_EXCEPTION("In CIStreamObjectIteratorThread",e);
1815             this->Fail();
1816         }
1817         return 0;
1818     }
1819 };
1820 
1821 // Reading thread for std objects
1822 template<typename TRoot, typename TObject>
1823 class CIStreamStdIteratorThread
1824     : public CIStreamIteratorThread_Base< TRoot,TObject >
1825 {
1826 public:
CIStreamStdIteratorThread(CObjectIStream & in,EOwnership deleteInStream)1827     CIStreamStdIteratorThread(CObjectIStream& in, EOwnership deleteInStream)
1828         : CIStreamIteratorThread_Base< TRoot,TObject >(in, deleteInStream)
1829     {
1830     }
1831 protected:
~CIStreamStdIteratorThread(void)1832     ~CIStreamStdIteratorThread(void)
1833     {
1834     }
Main(void)1835     virtual void* Main(void) override
1836     {
1837         this->m_Resume.Wait();
1838         // Enumerate objects of requested type
1839         try {
1840             Serial_FilterStdObjects< TRoot >( this->m_In,
1841                 new CIStreamObjectHook< TRoot, TObject >(*this));
1842             this->SetObject(0);
1843         } catch (CException& e) {
1844             NCBI_REPORT_EXCEPTION("In CIStreamStdIteratorThread",e);
1845             this->Fail();
1846         }
1847         return 0;
1848     }
1849 };
1850 
1851 template<typename TRoot, typename TObject>
1852 inline
Process(const TObject & obj)1853 void CIStreamObjectHook<TRoot,TObject>::Process(const TObject& obj)
1854 {
1855     m_Reader.SetObject(&obj);
1856 }
1857 
1858 // Stream iterator base class
1859 template<typename TRoot, typename TObject>
1860 class CIStreamIterator_Base
1861 {
1862 public:
operator ++()1863     void operator++()
1864     {
1865         m_Reader->Next();
1866     }
operator ++(int)1867     void operator++(int)
1868     {
1869         m_Reader->Next();
1870     }
operator *(void) const1871     const TObject& operator* (void) const
1872     {
1873         return *(m_Reader->GetObject());
1874     }
operator ->(void) const1875     const TObject* operator-> (void) const
1876     {
1877         return m_Reader->GetObject();
1878     }
IsValid(void) const1879     bool IsValid(void) const
1880     {
1881         return m_Reader->GetObject() != 0;
1882     }
1883 protected:
CIStreamIterator_Base()1884     CIStreamIterator_Base()
1885         : m_Reader(nullptr)
1886     {
1887     }
~CIStreamIterator_Base(void)1888     ~CIStreamIterator_Base(void)
1889     {
1890         if (m_Reader) {
1891             m_Reader->Stop();
1892         }
1893     }
1894 private:
1895     // prohibit copy
1896     CIStreamIterator_Base(const CIStreamIterator_Base<TRoot,TObject>& v);
1897     // prohibit assignment
1898     CIStreamIterator_Base<TRoot,TObject>& operator=(
1899         const CIStreamIterator_Base<TRoot,TObject>& v);
1900 
1901 protected:
1902     CIStreamIteratorThread_Base< TRoot, TObject > *m_Reader;
1903 };
1904 
1905 /// Stream iterator for serial objects
1906 ///
1907 /// Usage:
1908 ///    CObjectIStream* is = CObjectIStream::Open(...);
1909 ///    CIStreamObjectIterator<CRootClass,CObjectClass> i(*is);
1910 ///    for ( ; i.IsValid(); ++i) {
1911 ///        const CObjectClass& obj = *i;
1912 ///        ...
1913 ///    }
1914 /// IMPORTANT:
1915 ///     This API requires multi-threading!
1916 
1917 template<typename TRoot, typename TObject>
1918 class CIStreamObjectIterator
1919     : public CIStreamIterator_Base< TRoot, TObject>
1920 {
1921 public:
CIStreamObjectIterator(CObjectIStream & in,EOwnership deleteInStream=eNoOwnership)1922     CIStreamObjectIterator(CObjectIStream& in, EOwnership deleteInStream = eNoOwnership)
1923     {
1924         // Create reading thread, wait until it finds the next object
1925         this->m_Reader =
1926             new CIStreamObjectIteratorThread< TRoot, TObject >(in, deleteInStream);
1927         this->m_Reader->Run();
1928         this->m_Reader->Next();
1929     }
~CIStreamObjectIterator(void)1930     ~CIStreamObjectIterator(void)
1931     {
1932     }
1933 };
1934 
1935 /// Stream iterator for standard type objects
1936 ///
1937 /// Usage:
1938 ///    CObjectIStream* is = CObjectIStream::Open(...);
1939 ///    CIStreamStdIterator<CRootClass,string> i(*is);
1940 ///    for ( ; i.IsValid(); ++i) {
1941 ///        const string& obj = *i;
1942 ///        ...
1943 ///    }
1944 /// IMPORTANT:
1945 ///     This API requires multi-threading!
1946 
1947 template<typename TRoot, typename TObject>
1948 class CIStreamStdIterator
1949     : public CIStreamIterator_Base< TRoot, TObject>
1950 {
1951 public:
CIStreamStdIterator(CObjectIStream & in,EOwnership deleteInStream=eNoOwnership)1952     CIStreamStdIterator(CObjectIStream& in, EOwnership deleteInStream = eNoOwnership)
1953     {
1954         // Create reading thread, wait until it finds the next object
1955         this->m_Reader =
1956             new CIStreamStdIteratorThread< TRoot, TObject >(in,deleteInStream);
1957         this->m_Reader->Run();
1958         this->m_Reader->Next();
1959     }
~CIStreamStdIterator(void)1960     ~CIStreamStdIterator(void)
1961     {
1962     }
1963 };
1964 
1965 #endif // NCBI_THREADS
1966 
1967 
1968 /* @} */
1969 
1970 END_NCBI_SCOPE
1971 
1972 #endif  /* STREAMITER__HPP */
1973