1 /* Copyright (C) 2013-2019 Codership Oy <info@codership.com> */
2 /*!
3 * @file common RecordSet interface
4 *
5 * Record set is a collection of serialized records of the same type.
6 *
7 * It stores them in an iovec-like collection of buffers before sending
8 * and restores from a single buffer when receiving.
9 *
10 * $Id$
11 */
12
13 #ifndef _GU_RSET_HPP_
14 #define _GU_RSET_HPP_
15
16 #include "gu_vector.hpp"
17 #include "gu_alloc.hpp"
18 #include "gu_digest.hpp"
19
20 #include "gu_limits.h" // GU_MIN_ALIGNMENT
21
22 #ifdef GU_RSET_CHECK_SIZE
23 # include "gu_throw.hpp"
24 #endif
25
26 #include <string>
27
28 namespace gu {
29
30 class RecordSet
31 {
32 public:
33
34 enum Version
35 {
36 EMPTY = 0,
37 VER1,
38 VER2
39 };
40
41 static Version const MAX_VERSION = VER2;
42 static int const VER2_ALIGNMENT = GU_MIN_ALIGNMENT;
43
44 enum CheckType
45 {
46 CHECK_NONE = 0,
47 CHECK_MMH32,
48 CHECK_MMH64,
49 CHECK_MMH128
50 };
51
52 static int check_size(CheckType ct);
53
54 /*! return net, payload size of a RecordSet */
size() const55 size_t size() const { return size_; }
56
57 /*! return total, padded size of a RecordSet */
serial_size() const58 size_t serial_size() const { return GU_ALIGN(size_, alignment_); }
59
60 /*! return number of records in the record set */
count() const61 int count() const { return count_; }
62
version() const63 Version version() const { return Version(version_); }
check_type() const64 CheckType check_type() const { return CheckType(check_type_); }
65
66 /*! return alignment of the records */
alignment() const67 int alignment() const { return alignment_; };
68
69 typedef gu::Vector<gu::Buf, 16> GatherVector;
70
71 protected:
72
73 ssize_t size_;
74 int count_;
75
76 private:
77
78 byte_t version_;
79 byte_t check_type_;
80 byte_t alignment_;
81
82 protected:
83 /* ctor for RecordSetOut */
84 RecordSet (Version const version, CheckType const ct);
85
86 /* ctor for RecordSetIn */
RecordSet()87 RecordSet ()
88 : size_ (0),
89 count_ (0),
90 version_ (EMPTY),
91 check_type_(CHECK_NONE),
92 alignment_ (Version(0))
93 {}
94
95 void init (const byte_t* buf, ssize_t size);
96
~RecordSet()97 ~RecordSet() {}
98 };
99
100 /*! specialization of Vector::serialize() method */
101 template<> inline RecordSet::GatherVector::size_type
serialize(void * const buf,size_type const buf_size,size_type const offset)102 RecordSet::GatherVector::serialize(void* const buf,
103 size_type const buf_size,
104 size_type const offset /* = 0 */)
105 {
106 byte_t* to (static_cast<byte_t*>(buf) + offset);
107 byte_t* const end(static_cast<byte_t*>(buf) + buf_size);
108 for (size_type i(0); i < size(); ++i)
109 {
110 const gu::Buf& f((*this)[i]);
111 if (to + f.size > end)
112 {
113 gu_throw_fatal << "attempt to write beyond buffer boundary";
114 }
115 const gu::byte_t* from(static_cast<const gu::byte_t*>(f.ptr));
116 to = std::copy(from, from + f.size, to);
117 }
118 return to - static_cast<byte_t*>(buf);
119 }
120
121
122 #if defined(__GNUG__)
123 # if (__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || (__GNUC__ > 4)
124 # pragma GCC diagnostic push
125 # endif // (__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || (__GNUC__ > 4)
126 # pragma GCC diagnostic ignored "-Weffc++"
127 #endif
128
129 /*! class to store records in buffer(s) to send out */
130 class RecordSetOutBase : public RecordSet
131 {
132 public:
133
134 typedef Allocator::BaseName BaseName;
135
136 /*! return number of disjoint pages in the record set */
page_count() const137 ssize_t page_count() const { return bufs_->size() + padding_page_needed(); }
138
139 /*! return vector of RecordSet fragments in adjusent order */
140 ssize_t gather (GatherVector& out);
141
142 protected:
143
RecordSetOutBase()144 RecordSetOutBase() : RecordSet() {}
145
146 RecordSetOutBase (byte_t* reserved,
147 size_t reserved_size,
148 const BaseName& base_name, /* basename for on-disk
149 * allocator */
150 CheckType ct,
151 Version version = MAX_VERSION
152 #ifdef GU_RSET_CHECK_SIZE
153 ,ssize_t max_size = 0x7fffffff
154 #endif
155 );
156
157 /* this is to emulate partial specialization of function template through
158 * overloading by parameter */
159 template <bool store> struct HasPtr{};
160
161 /* variant for classes that don't provide ptr() method and need to be
162 * explicitly serialized to internal storage */
163 template <class R>
process(const R & record,const byte_t * & ptr,bool & new_page,size_t const size,bool,HasPtr<false>)164 void process (const R& record,
165 const byte_t*& ptr,
166 bool& new_page,
167 size_t const size,
168 bool,
169 HasPtr<false>)
170 {
171 byte_t* const dst(alloc(size, new_page));
172
173 ptr = dst;
174
175 #ifdef NDEBUG
176 record.serialize_to (dst, size);
177 #else
178 size_t const ssize (record.serialize_to (dst, size));
179 assert (ssize == size);
180 #endif
181 }
182
183 /* variant for classes that have ptr() method and can be either serialized
184 * or referenced */
185 template <class R>
process(const R & record,const byte_t * & ptr,bool & new_page,size_t const size,bool const store,HasPtr<true>)186 void process (const R& record,
187 const byte_t*& ptr,
188 bool& new_page,
189 size_t const size,
190 bool const store,
191 HasPtr<true>)
192 {
193 if (store)
194 {
195 process (record, ptr, new_page, size, true, HasPtr<false>());
196 }
197 else
198 {
199 ptr = record.ptr();
200 new_page = true;
201 }
202 }
203
204 template <class R, bool has_ptr>
205 std::pair<const byte_t*, size_t>
append_base(const R & record,bool const store=true,bool const new_record=true)206 append_base (const R& record,
207 bool const store = true,
208 bool const new_record = true)
209 {
210 ssize_t const size (record.serial_size());
211
212 #ifdef GU_RSET_CHECK_SIZE
213 if (gu_unlikely(size > max_size_ - size_)) gu_throw_error(EMSGSIZE);
214 #endif
215
216 bool new_page;
217 const byte_t* ptr;
218
219 process (record, ptr, new_page, size, store, HasPtr<has_ptr>());
220
221 prev_stored_ = store;
222 // make sure there is at least one record
223 count_ += new_record || (0 == count_);
224
225 post_append (new_page, ptr, size);
226
227 size_ += size;
228
229 return std::pair<const byte_t*, size_t>(ptr, size);
230 }
231
232 private:
233
234 #ifdef GU_RSET_CHECK_SIZE
235 ssize_t const max_size_;
236 #endif
237 Allocator alloc_;
238 Hash check_;
239 Vector<Buf, Allocator::INITIAL_VECTOR_SIZE> bufs_;
240 bool prev_stored_;
241
padding_page_needed() const242 inline bool padding_page_needed() const
243 {
244 return (size_ % alignment());
245 }
246
247 inline byte_t*
alloc(size_t const size,bool & new_page)248 alloc(size_t const size, bool& new_page)
249 {
250 byte_t* const ret(alloc_.alloc (size, new_page));
251 new_page = (new_page || !prev_stored_);
252 return ret;
253 }
254
255 inline void
post_alloc(bool const new_page,const byte_t * const ptr,ssize_t const size)256 post_alloc (bool const new_page,
257 const byte_t* const ptr,
258 ssize_t const size)
259 {
260 if (new_page)
261 {
262 Buf b = { ptr, size };
263 bufs_->push_back (b);
264 }
265 else
266 {
267 bufs_->back().size += size;
268 }
269 }
270
271 inline void
post_append(bool const new_page,const byte_t * const ptr,ssize_t const size)272 post_append (bool const new_page,
273 const byte_t* const ptr,
274 ssize_t const size)
275 {
276 check_.append (ptr, size);
277 post_alloc (new_page, ptr, size);
278 }
279
280
281 int header_size () const;
282 int header_size_max () const;
283
284 /* Writes the header to the end of provided buffer, returns header
285 * offset from ptr */
286 ssize_t write_header (byte_t* ptr, ssize_t size);
287 };
288
289
290 /*! This is a small wrapper template for RecordSetOutBase to avoid templating
291 * the whole thing instead of just the two append methods. */
292 template <class R>
293 class RecordSetOut : public RecordSetOutBase
294 {
295 public:
296
297 typedef RecordSetOutBase::BaseName BaseName;
298
RecordSetOut()299 RecordSetOut() : RecordSetOutBase() {}
300
RecordSetOut(byte_t * reserved,size_t reserved_size,const BaseName & base_name,CheckType ct,Version version=MAX_VERSION,ssize_t max_size=0x7fffffff)301 RecordSetOut (byte_t* reserved,
302 size_t reserved_size,
303 const BaseName& base_name,
304 CheckType ct,
305 Version version = MAX_VERSION
306 #ifdef GU_RSET_CHECK_SIZE
307 ,ssize_t max_size = 0x7fffffff
308 #endif
309 )
310 : RecordSetOutBase (reserved, reserved_size, base_name, ct, version
311 #ifdef GU_RSET_CHECK_SIZE
312 ,max_size
313 #endif
314 )
315 {}
316
317 std::pair<const byte_t*, size_t>
append(const R & r)318 append (const R& r)
319 {
320 return append_base<R, false> (r);
321 // return append_base<R> (r); old append_base() method
322 }
323
324 std::pair<const byte_t*, size_t>
append(const void * const src,ssize_t const size,bool const store=true,bool const new_record=true)325 append (const void* const src, ssize_t const size,
326 bool const store = true, bool const new_record = true)
327 {
328 assert (src);
329 assert (size);
330
331 BufWrap bw (src, size);
332 return append_base<BufWrap, true> (bw, store, new_record);
333 // return append_base (src, size, store); - old append_base() method
334 }
335
336 private:
337
338 /*! a wrapper class to represent ptr and size as a serializable object:
339 * simply defines serial_size(), ptr() and serialize_to() methods */
340 class BufWrap
341 {
342 const byte_t* const ptr_;
343 size_t const size_;
344
345 public:
346
BufWrap(const void * const ptr,size_t const size)347 BufWrap (const void* const ptr, size_t const size)
348 : ptr_(reinterpret_cast<const byte_t*>(ptr)), size_(size)
349 {}
350
serial_size() const351 size_t serial_size() const { return size_; }
ptr() const352 const byte_t* ptr() const { return ptr_; }
353
serialize_to(byte_t * const dst,size_t) const354 size_t serialize_to (byte_t* const dst, size_t) const
355 {
356 ::memcpy (dst, ptr_, size_);
357 return size_;
358 }
359 };
360
361 RecordSetOut (const RecordSetOut&);
362 RecordSetOut& operator = (const RecordSetOut&);
363
364 }; /* class RecordSetOut */
365
366
367 /*! class to recover records from a buffer */
368 class RecordSetInBase : public RecordSet
369 {
370 public:
371
372 RecordSetInBase (const byte_t* buf,/* pointer to the beginning of buffer */
373 size_t size, /* total size of buffer */
374 bool check_now = true); /* checksum now */
375
376 /* this is a "delayed constructor", for the object created empty */
377 void init (const byte_t* buf, /* pointer to the beginning of buffer */
378 size_t size, /* total size of buffer */
379 bool check_now = true); /* checksum now */
380
381
rewind() const382 void rewind() const { next_ = begin_; }
383
384 void checksum() const; // throws if checksum fails
385
386 uint64_t get_checksum() const;
387
buf() const388 gu::Buf buf() const
389 {
390 gu::Buf ret = { head_, ssize_t(serial_size()) }; return ret;
391 }
392
393 protected:
394
395 template <class R>
next_base(Buf & n) const396 void next_base (Buf& n) const
397 {
398 if (gu_likely (next_ < size_))
399 {
400 size_t const next_size(R::serial_size(head_ + next_, size_ -next_));
401
402 /* sanity check */
403 if (gu_likely (next_ + next_size <= size_t(size_)))
404 {
405 n.ptr = head_ + next_;
406 n.size = next_size;
407 next_ += next_size;
408 return;
409 }
410
411 throw_error (E_FAULT);
412 }
413
414 assert (next_ == size_);
415
416 throw_error (E_PERM);
417 }
418
419 template <class R>
next_base() const420 R next_base () const
421 {
422 if (gu_likely (next_ < size_))
423 {
424 R const rec(head_ + next_, size_ - next_);
425 size_t const tmp_size(rec.serial_size());
426
427 /* sanity check */
428 if (gu_likely (next_ + tmp_size <= size_t(size_)))
429 {
430 next_ += tmp_size;
431 return rec;
432 }
433
434 throw_error (E_FAULT);
435 }
436
437 assert (next_ == size_);
438
439 throw_error (E_PERM);
440 }
441
442 private:
443
444 const byte_t* head_; /* pointer to header */
445 ssize_t mutable next_; /* offset to next record */
446 short begin_; /* offset to first record */
447 /* size_ from parent class is offset past all records */
448
449 /* takes total size of the supplied buffer */
450 void parse_header_v1_2 (size_t size);
451
452 enum Error
453 {
454 E_PERM,
455 E_FAULT
456 };
457
458 GU_NORETURN void throw_error (Error code) const;
459
460 /* shallow copies here - we're not allocating anything */
RecordSetInBase(const RecordSetInBase & r)461 RecordSetInBase (const RecordSetInBase& r)
462 :
463 RecordSet (r),
464 head_ (r.head_),
465 next_ (r.next_),
466 begin_ (r.begin_)
467 {}
468
469 RecordSetInBase& operator= (const RecordSetInBase r);
470 #if 0
471 {
472 std::swap(head_, r.head_);
473 std::swap(next_, r.next_);
474 std::swap(begin, r.begin_);
475 }
476 #endif
477 }; /* class RecordSetInBase */
478
479
480 /*! This is a small wrapper template for RecordSetInBase to avoid templating
481 * the whole thing instead of just the two next methods. */
482 template <class R>
483 class RecordSetIn : public RecordSetInBase
484 {
485 public:
486
RecordSetIn(const void * buf,size_t size,bool check_first=true)487 RecordSetIn (const void* buf,/* pointer to the beginning of buffer */
488 size_t size, /* total size of buffer */
489 bool check_first = true) /* checksum now */
490 :
491 RecordSetInBase (reinterpret_cast<const byte_t*>(buf),
492 size, check_first)
493 {}
494
RecordSetIn()495 RecordSetIn () : RecordSetInBase (NULL, 0, false) {}
496
next(Buf & n) const497 void next (Buf& n) const { next_base<R> (n); }
498
next() const499 R next () const { return next_base<R> (); }
500 }; /* class RecordSetIn */
501
502 #if defined(__GNUG__)
503 # if (__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || (__GNUC__ > 4)
504 # pragma GCC diagnostic pop
505 # endif // (__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || (__GNUC__ > 4)
506 #endif
507
508 } /* namespace gu */
509
510 #endif /* _GU_RSET_HPP_ */
511