1 /* Copyright (C) 2013-2019 Codership Oy <info@codership.com> */
2 /*!
3  * @file common RecordSet interface
4  *
5  * Record set is a collection of serialized records of the same type.
6  *
7  * It stores them in an iovec-like collection of buffers before sending
8  * and restores from a single buffer when receiving.
9  *
10  * $Id$
11  */
12 
13 #ifndef _GU_RSET_HPP_
14 #define _GU_RSET_HPP_
15 
16 #include "gu_vector.hpp"
17 #include "gu_alloc.hpp"
18 #include "gu_digest.hpp"
19 
20 #include "gu_limits.h" // GU_MIN_ALIGNMENT
21 
22 #ifdef GU_RSET_CHECK_SIZE
23 #  include "gu_throw.hpp"
24 #endif
25 
26 #include <string>
27 
28 namespace gu {
29 
30 class RecordSet
31 {
32 public:
33 
34     enum Version
35     {
36         EMPTY = 0,
37         VER1,
38         VER2
39     };
40 
41     static Version const MAX_VERSION    = VER2;
42     static int     const VER2_ALIGNMENT = GU_MIN_ALIGNMENT;
43 
44     enum CheckType
45     {
46         CHECK_NONE   = 0,
47         CHECK_MMH32,
48         CHECK_MMH64,
49         CHECK_MMH128
50     };
51 
52     static int check_size(CheckType ct);
53 
54     /*! return net, payload size of a RecordSet */
size() const55     size_t size() const  { return size_; }
56 
57     /*! return total, padded size of a RecordSet */
serial_size() const58     size_t serial_size() const  { return GU_ALIGN(size_, alignment_); }
59 
60     /*! return number of records in the record set */
count() const61     int    count() const { return count_; }
62 
version() const63     Version   version()    const { return Version(version_); }
check_type() const64     CheckType check_type() const { return CheckType(check_type_); }
65 
66     /*! return alignment of the records */
alignment() const67     int       alignment()  const { return alignment_; };
68 
69     typedef gu::Vector<gu::Buf, 16> GatherVector;
70 
71 protected:
72 
73     ssize_t   size_;
74     int       count_;
75 
76 private:
77 
78     byte_t    version_;
79     byte_t    check_type_;
80     byte_t    alignment_;
81 
82 protected:
83     /* ctor for RecordSetOut */
84     RecordSet (Version const version, CheckType const ct);
85 
86     /* ctor for RecordSetIn */
RecordSet()87     RecordSet ()
88         : size_      (0),
89           count_     (0),
90           version_   (EMPTY),
91           check_type_(CHECK_NONE),
92           alignment_ (Version(0))
93     {}
94 
95     void init (const byte_t* buf, ssize_t size);
96 
~RecordSet()97     ~RecordSet() {}
98 };
99 
100 /*! specialization of Vector::serialize() method */
101 template<> inline RecordSet::GatherVector::size_type
serialize(void * const buf,size_type const buf_size,size_type const offset)102 RecordSet::GatherVector::serialize(void*     const buf,
103                                    size_type const buf_size,
104                                    size_type const offset /* = 0 */)
105 {
106     byte_t*       to (static_cast<byte_t*>(buf) + offset);
107     byte_t* const end(static_cast<byte_t*>(buf) + buf_size);
108     for (size_type i(0); i < size(); ++i)
109     {
110         const gu::Buf& f((*this)[i]);
111         if (to + f.size > end)
112         {
113             gu_throw_fatal << "attempt to write beyond buffer boundary";
114         }
115         const gu::byte_t* from(static_cast<const gu::byte_t*>(f.ptr));
116         to = std::copy(from, from + f.size, to);
117     }
118     return to - static_cast<byte_t*>(buf);
119 }
120 
121 
122 #if defined(__GNUG__)
123 # if (__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || (__GNUC__ > 4)
124 #  pragma GCC diagnostic push
125 # endif // (__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || (__GNUC__ > 4)
126 # pragma GCC diagnostic ignored "-Weffc++"
127 #endif
128 
129 /*! class to store records in buffer(s) to send out */
130 class RecordSetOutBase : public RecordSet
131 {
132 public:
133 
134     typedef Allocator::BaseName BaseName;
135 
136     /*! return number of disjoint pages in the record set */
page_count() const137     ssize_t page_count() const { return bufs_->size() + padding_page_needed(); }
138 
139     /*! return vector of RecordSet fragments in adjusent order */
140     ssize_t gather (GatherVector& out);
141 
142 protected:
143 
RecordSetOutBase()144     RecordSetOutBase() : RecordSet() {}
145 
146     RecordSetOutBase (byte_t*           reserved,
147                       size_t            reserved_size,
148                       const BaseName&   base_name,     /* basename for on-disk
149                                                         * allocator */
150                       CheckType         ct,
151                       Version           version  = MAX_VERSION
152 #ifdef GU_RSET_CHECK_SIZE
153                       ,ssize_t          max_size = 0x7fffffff
154 #endif
155         );
156 
157     /* this is to emulate partial specialization of function template through
158      * overloading by parameter */
159     template <bool store> struct HasPtr{};
160 
161     /* variant for classes that don't provide ptr() method and need to be
162      * explicitly serialized to internal storage */
163     template <class R>
process(const R & record,const byte_t * & ptr,bool & new_page,size_t const size,bool,HasPtr<false>)164     void process (const R&       record,
165                   const byte_t*& ptr,
166                   bool&          new_page,
167                   size_t const   size,
168                   bool,
169                   HasPtr<false>)
170     {
171         byte_t* const dst(alloc(size, new_page));
172 
173         ptr = dst;
174 
175 #ifdef NDEBUG
176         record.serialize_to (dst, size);
177 #else
178         size_t const ssize (record.serialize_to (dst, size));
179         assert (ssize == size);
180 #endif
181     }
182 
183     /* variant for classes that have ptr() method and can be either serialized
184      * or referenced */
185     template <class R>
process(const R & record,const byte_t * & ptr,bool & new_page,size_t const size,bool const store,HasPtr<true>)186     void process (const R&       record,
187                   const byte_t*& ptr,
188                   bool&          new_page,
189                   size_t const   size,
190                   bool const     store,
191                   HasPtr<true>)
192     {
193         if (store)
194         {
195             process (record, ptr, new_page, size, true, HasPtr<false>());
196         }
197         else
198         {
199             ptr = record.ptr();
200             new_page = true;
201         }
202     }
203 
204     template <class R, bool has_ptr>
205     std::pair<const byte_t*, size_t>
append_base(const R & record,bool const store=true,bool const new_record=true)206     append_base (const R& record,
207                  bool const store = true,
208                  bool const new_record = true)
209     {
210         ssize_t const size (record.serial_size());
211 
212 #ifdef GU_RSET_CHECK_SIZE
213         if (gu_unlikely(size > max_size_ - size_)) gu_throw_error(EMSGSIZE);
214 #endif
215 
216         bool new_page;
217         const byte_t* ptr;
218 
219         process (record, ptr, new_page, size, store, HasPtr<has_ptr>());
220 
221         prev_stored_ = store;
222         // make sure there is at least one record
223         count_ += new_record || (0 == count_);
224 
225         post_append (new_page, ptr, size);
226 
227         size_ += size;
228 
229         return std::pair<const byte_t*, size_t>(ptr, size);
230     }
231 
232 private:
233 
234 #ifdef GU_RSET_CHECK_SIZE
235     ssize_t const max_size_;
236 #endif
237     Allocator     alloc_;
238     Hash          check_;
239     Vector<Buf, Allocator::INITIAL_VECTOR_SIZE> bufs_;
240     bool          prev_stored_;
241 
padding_page_needed() const242     inline bool padding_page_needed() const
243     {
244         return (size_ % alignment());
245     }
246 
247     inline byte_t*
alloc(size_t const size,bool & new_page)248     alloc(size_t const size, bool& new_page)
249     {
250         byte_t* const ret(alloc_.alloc (size, new_page));
251         new_page = (new_page || !prev_stored_);
252         return ret;
253     }
254 
255     inline void
post_alloc(bool const new_page,const byte_t * const ptr,ssize_t const size)256     post_alloc (bool const          new_page,
257                 const byte_t* const ptr,
258                 ssize_t const       size)
259     {
260         if (new_page)
261         {
262             Buf b = { ptr, size };
263             bufs_->push_back (b);
264         }
265         else
266         {
267             bufs_->back().size += size;
268         }
269     }
270 
271     inline void
post_append(bool const new_page,const byte_t * const ptr,ssize_t const size)272     post_append (bool const          new_page,
273                  const byte_t* const ptr,
274                  ssize_t const       size)
275     {
276         check_.append (ptr, size);
277         post_alloc (new_page, ptr, size);
278     }
279 
280 
281     int header_size     () const;
282     int header_size_max () const;
283 
284     /* Writes the header to the end of provided buffer, returns header
285      * offset from ptr */
286     ssize_t write_header (byte_t* ptr, ssize_t size);
287 };
288 
289 
290 /*! This is a small wrapper template for RecordSetOutBase to avoid templating
291  *  the whole thing instead of just the two append methods. */
292 template <class R>
293 class RecordSetOut : public RecordSetOutBase
294 {
295 public:
296 
297     typedef RecordSetOutBase::BaseName BaseName;
298 
RecordSetOut()299     RecordSetOut() : RecordSetOutBase() {}
300 
RecordSetOut(byte_t * reserved,size_t reserved_size,const BaseName & base_name,CheckType ct,Version version=MAX_VERSION,ssize_t max_size=0x7fffffff)301     RecordSetOut (byte_t*             reserved,
302                   size_t              reserved_size,
303                   const BaseName&     base_name,
304                   CheckType           ct,
305                   Version             version  = MAX_VERSION
306 #ifdef GU_RSET_CHECK_SIZE
307                   ,ssize_t            max_size = 0x7fffffff
308 #endif
309         )
310         : RecordSetOutBase (reserved, reserved_size, base_name, ct, version
311 #ifdef GU_RSET_CHECK_SIZE
312                             ,max_size
313 #endif
314             )
315     {}
316 
317     std::pair<const byte_t*, size_t>
append(const R & r)318     append (const R& r)
319     {
320         return append_base<R, false> (r);
321 //        return append_base<R> (r); old append_base() method
322     }
323 
324     std::pair<const byte_t*, size_t>
append(const void * const src,ssize_t const size,bool const store=true,bool const new_record=true)325     append (const void* const src, ssize_t const size,
326             bool const store = true, bool const new_record = true)
327     {
328         assert (src);
329         assert (size);
330 
331         BufWrap bw (src, size);
332         return append_base<BufWrap, true> (bw, store, new_record);
333 //        return append_base (src, size, store); - old append_base() method
334     }
335 
336 private:
337 
338     /*! a wrapper class to represent ptr and size as a serializable object:
339      *  simply defines serial_size(), ptr() and serialize_to() methods */
340     class BufWrap
341     {
342         const byte_t* const ptr_;
343         size_t const        size_;
344 
345     public:
346 
BufWrap(const void * const ptr,size_t const size)347         BufWrap (const void* const ptr, size_t const size)
348             : ptr_(reinterpret_cast<const byte_t*>(ptr)), size_(size)
349         {}
350 
serial_size() const351         size_t serial_size() const { return size_; }
ptr() const352         const byte_t* ptr()  const { return ptr_;  }
353 
serialize_to(byte_t * const dst,size_t) const354         size_t serialize_to (byte_t* const dst, size_t) const
355         {
356             ::memcpy (dst, ptr_, size_);
357             return size_;
358         }
359     };
360 
361     RecordSetOut (const RecordSetOut&);
362     RecordSetOut& operator = (const RecordSetOut&);
363 
364 }; /* class RecordSetOut */
365 
366 
367 /*! class to recover records from a buffer */
368 class RecordSetInBase : public RecordSet
369 {
370 public:
371 
372     RecordSetInBase (const byte_t* buf,/* pointer to the beginning of buffer */
373                      size_t        size,             /* total size of buffer */
374                      bool          check_now = true);        /* checksum now */
375 
376     /* this is a "delayed constructor", for the object created empty */
377     void init (const byte_t* buf,      /* pointer to the beginning of buffer */
378                size_t        size,                   /* total size of buffer */
379                bool          check_now = true);              /* checksum now */
380 
381 
rewind() const382     void rewind() const { next_ = begin_; }
383 
384     void checksum() const; // throws if checksum fails
385 
386     uint64_t get_checksum() const;
387 
buf() const388     gu::Buf buf() const
389     {
390         gu::Buf ret = { head_, ssize_t(serial_size()) }; return ret;
391     }
392 
393 protected:
394 
395     template <class R>
next_base(Buf & n) const396     void next_base (Buf& n) const
397     {
398         if (gu_likely (next_ < size_))
399         {
400             size_t const next_size(R::serial_size(head_ + next_, size_ -next_));
401 
402             /* sanity check */
403             if (gu_likely (next_ + next_size <= size_t(size_)))
404             {
405                 n.ptr  = head_ + next_;
406                 n.size = next_size;
407                 next_ += next_size;
408                 return;
409             }
410 
411             throw_error (E_FAULT);
412         }
413 
414         assert (next_ == size_);
415 
416         throw_error (E_PERM);
417     }
418 
419     template <class R>
next_base() const420     R next_base () const
421     {
422         if (gu_likely (next_ < size_))
423         {
424             R const      rec(head_ + next_, size_ - next_);
425             size_t const tmp_size(rec.serial_size());
426 
427             /* sanity check */
428             if (gu_likely (next_ + tmp_size <= size_t(size_)))
429             {
430                 next_ += tmp_size;
431                 return rec;
432             }
433 
434             throw_error (E_FAULT);
435         }
436 
437         assert (next_ == size_);
438 
439         throw_error (E_PERM);
440     }
441 
442 private:
443 
444     const byte_t*   head_;        /* pointer to header        */
445     ssize_t mutable next_;        /* offset to next record    */
446     short           begin_;       /* offset to first record   */
447     /* size_ from parent class is offset past all records */
448 
449     /* takes total size of the supplied buffer */
450     void parse_header_v1_2 (size_t size);
451 
452     enum Error
453     {
454         E_PERM,
455         E_FAULT
456     };
457 
458     GU_NORETURN void throw_error (Error code) const;
459 
460     /* shallow copies here - we're not allocating anything */
RecordSetInBase(const RecordSetInBase & r)461     RecordSetInBase (const RecordSetInBase& r)
462     :
463     RecordSet   (r),
464     head_       (r.head_),
465     next_       (r.next_),
466     begin_      (r.begin_)
467     {}
468 
469     RecordSetInBase& operator= (const RecordSetInBase r);
470 #if 0
471     {
472         std::swap(head_, r.head_);
473         std::swap(next_, r.next_);
474         std::swap(begin, r.begin_);
475     }
476 #endif
477 }; /* class RecordSetInBase */
478 
479 
480 /*! This is a small wrapper template for RecordSetInBase to avoid templating
481  *  the whole thing instead of just the two next methods. */
482 template <class R>
483 class RecordSetIn : public RecordSetInBase
484 {
485 public:
486 
RecordSetIn(const void * buf,size_t size,bool check_first=true)487     RecordSetIn (const void* buf,/* pointer to the beginning of buffer */
488                  size_t      size,             /* total size of buffer */
489                  bool        check_first = true)       /* checksum now */
490         :
491         RecordSetInBase (reinterpret_cast<const byte_t*>(buf),
492                          size, check_first)
493     {}
494 
RecordSetIn()495     RecordSetIn () : RecordSetInBase (NULL, 0, false) {}
496 
next(Buf & n) const497     void next (Buf& n) const { next_base<R> (n); }
498 
next() const499     R next () const { return next_base<R> (); }
500 }; /* class RecordSetIn */
501 
502 #if defined(__GNUG__)
503 # if (__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || (__GNUC__ > 4)
504 #  pragma GCC diagnostic pop
505 # endif // (__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || (__GNUC__ > 4)
506 #endif
507 
508 } /* namespace gu */
509 
510 #endif /* _GU_RSET_HPP_ */
511