1 /*!
2  *  Copyright (c) 2015 by Contributors
3  * \file io.h
4  * \brief defines serializable interface of dmlc
5  */
6 #ifndef DMLC_IO_H_
7 #define DMLC_IO_H_
8 #include <cstdio>
9 #include <string>
10 #include <cstring>
11 #include <vector>
12 #include <istream>
13 #include <ostream>
14 #include <streambuf>
15 #include "./logging.h"
16 
17 // include uint64_t only to make io standalone
18 #ifdef _MSC_VER
19 /*! \brief uint64 */
20 typedef unsigned __int64 uint64_t;
21 #else
22 #include <inttypes.h>
23 #endif
24 
25 /*! \brief namespace for dmlc */
26 namespace dmlc {
27 /*!
28  * \brief interface of stream I/O for serialization
29  */
30 class Stream {  // NOLINT(*)
31  public:
32   /*!
33    * \brief reads data from a stream
34    * \param ptr pointer to a memory buffer
35    * \param size block size
36    * \return the size of data read
37    */
38   virtual size_t Read(void *ptr, size_t size) = 0;
39   /*!
40    * \brief writes data to a stream
41    * \param ptr pointer to a memory buffer
42    * \param size block size
43    */
44   virtual void Write(const void *ptr, size_t size) = 0;
45   /*! \brief virtual destructor */
~Stream(void)46   virtual ~Stream(void) {}
47   /*!
48    * \brief generic factory function
49    *  create an stream, the stream will close the underlying files upon deletion
50    *
51    * \param uri the uri of the input currently we support
52    *            hdfs://, s3://, and file:// by default file:// will be used
53    * \param flag can be "w", "r", "a"
54    * \param allow_null whether NULL can be returned, or directly report error
55    * \return the created stream, can be NULL when allow_null == true and file do not exist
56    */
57   static Stream *Create(const char *uri,
58                         const char* const flag,
59                         bool allow_null = false);
60   // helper functions to write/read different data structures
61   /*!
62    * \brief writes a data to stream.
63    *
64    * dmlc::Stream support Write/Read of most STL composites and base types.
65    * If the data type is not supported, a compile time error will be issued.
66    *
67    * This function is endian-aware,
68    * the output endian defined by DMLC_IO_USE_LITTLE_ENDIAN
69    *
70    * \param data data to be written
71    * \tparam T the data type to be written
72    */
73   template<typename T>
74   inline void Write(const T &data);
75   /*!
76    * \brief loads a data from stream.
77    *
78    * dmlc::Stream support Write/Read of most STL composites and base types.
79    * If the data type is not supported, a compile time error will be issued.
80    *
81    * This function is endian-aware,
82    * the input endian defined by DMLC_IO_USE_LITTLE_ENDIAN
83    *
84    * \param out_data place holder of data to be deserialized
85    * \return whether the load was successful
86    */
87   template<typename T>
88   inline bool Read(T *out_data);
89   /*!
90    * \brief Endian aware write array of data.
91    * \param data The data pointer
92    * \param num_elems Number of elements
93    * \tparam T the data type.
94    */
95   template<typename T>
96   inline void WriteArray(const T* data, size_t num_elems);
97   /*!
98    * \brief Endian aware read array of data.
99    * \param data The data pointer
100    * \param num_elems Number of elements
101    * \tparam T the data type.
102    * \return whether the load was successful
103    */
104   template<typename T>
105   inline bool ReadArray(T* data, size_t num_elems);
106 };
107 
108 /*! \brief interface of i/o stream that support seek */
109 class SeekStream: public Stream {
110  public:
111   // virtual destructor
~SeekStream(void)112   virtual ~SeekStream(void) {}
113   /*! \brief seek to certain position of the file */
114   virtual void Seek(size_t pos) = 0;
115   /*! \brief tell the position of the stream */
116   virtual size_t Tell(void) = 0;
117   /*!
118    * \brief generic factory function
119    *  create an SeekStream for read only,
120    *  the stream will close the underlying files upon deletion
121    *  error will be reported and the system will exit when create failed
122    * \param uri the uri of the input currently we support
123    *            hdfs://, s3://, and file:// by default file:// will be used
124    * \param allow_null whether NULL can be returned, or directly report error
125    * \return the created stream, can be NULL when allow_null == true and file do not exist
126    */
127   static SeekStream *CreateForRead(const char *uri,
128                                    bool allow_null = false);
129 };
130 
131 /*! \brief interface for serializable objects */
132 class Serializable {
133  public:
134   /*! \brief virtual destructor */
~Serializable()135   virtual ~Serializable() {}
136   /*!
137   * \brief load the model from a stream
138   * \param fi stream where to load the model from
139   */
140   virtual void Load(Stream *fi) = 0;
141   /*!
142   * \brief saves the model to a stream
143   * \param fo stream where to save the model to
144   */
145   virtual void Save(Stream *fo) const = 0;
146 };
147 
148 /*!
149  * \brief input split creates that allows reading
150  *  of records from split of data,
151  *  independent part that covers all the dataset
152  *
153  *  see InputSplit::Create for definition of record
154  */
155 class InputSplit {
156  public:
157   /*! \brief a blob of memory region */
158   struct Blob {
159     /*! \brief points to start of the memory region */
160     void *dptr;
161     /*! \brief size of the memory region */
162     size_t size;
163   };
164   /*!
165    * \brief hint the inputsplit how large the chunk size
166    *  it should return when implementing NextChunk
167    *  this is a hint so may not be enforced,
168    *  but InputSplit will try adjust its internal buffer
169    *  size to the hinted value
170    * \param chunk_size the chunk size
171    */
HintChunkSize(size_t chunk_size)172   virtual void HintChunkSize(size_t chunk_size) {}
173   /*! \brief get the total size of the InputSplit */
174   virtual size_t GetTotalSize(void) = 0;
175   /*! \brief reset the position of InputSplit to beginning */
176   virtual void BeforeFirst(void) = 0;
177   /*!
178    * \brief get the next record, the returning value
179    *   is valid until next call to NextRecord, NextChunk or NextBatch
180    *   caller can modify the memory content of out_rec
181    *
182    *   For text, out_rec contains a single line
183    *   For recordio, out_rec contains one record content(with header striped)
184    *
185    * \param out_rec used to store the result
186    * \return true if we can successfully get next record
187    *     false if we reached end of split
188    * \sa InputSplit::Create for definition of record
189    */
190   virtual bool NextRecord(Blob *out_rec) = 0;
191   /*!
192    * \brief get a chunk of memory that can contain multiple records,
193    *  the caller needs to parse the content of the resulting chunk,
194    *  for text file, out_chunk can contain data of multiple lines
195    *  for recordio, out_chunk can contain multiple records(including headers)
196    *
197    *  This function ensures there won't be partial record in the chunk
198    *  caller can modify the memory content of out_chunk,
199    *  the memory is valid until next call to NextRecord, NextChunk or NextBatch
200    *
201    *  Usually NextRecord is sufficient, NextChunk can be used by some
202    *  multi-threaded parsers to parse the input content
203    *
204    * \param out_chunk used to store the result
205    * \return true if we can successfully get next record
206    *     false if we reached end of split
207    * \sa InputSplit::Create for definition of record
208    * \sa RecordIOChunkReader to parse recordio content from out_chunk
209    */
210   virtual bool NextChunk(Blob *out_chunk) = 0;
211   /*!
212    * \brief get a chunk of memory that can contain multiple records,
213    *  with hint for how many records is needed,
214    *  the caller needs to parse the content of the resulting chunk,
215    *  for text file, out_chunk can contain data of multiple lines
216    *  for recordio, out_chunk can contain multiple records(including headers)
217    *
218    *  This function ensures there won't be partial record in the chunk
219    *  caller can modify the memory content of out_chunk,
220    *  the memory is valid until next call to NextRecord, NextChunk or NextBatch
221    *
222    *
223    * \param out_chunk used to store the result
224    * \param n_records used as a hint for how many records should be returned, may be ignored
225    * \return true if we can successfully get next record
226    *     false if we reached end of split
227    * \sa InputSplit::Create for definition of record
228    * \sa RecordIOChunkReader to parse recordio content from out_chunk
229    */
NextBatch(Blob * out_chunk,size_t n_records)230   virtual bool NextBatch(Blob *out_chunk, size_t n_records) {
231     return NextChunk(out_chunk);
232   }
233   /*! \brief destructor*/
~InputSplit(void)234   virtual ~InputSplit(void) DMLC_THROW_EXCEPTION {}
235   /*!
236    * \brief reset the Input split to a certain part id,
237    *  The InputSplit will be pointed to the head of the new specified segment.
238    *  This feature may not be supported by every implementation of InputSplit.
239    * \param part_index The part id of the new input.
240    * \param num_parts The total number of parts.
241    */
242   virtual void ResetPartition(unsigned part_index, unsigned num_parts) = 0;
243   /*!
244    * \brief factory function:
245    *  create input split given a uri
246    * \param uri the uri of the input, can contain hdfs prefix
247    * \param part_index the part id of current input
248    * \param num_parts total number of splits
249    * \param type type of record
250    *   List of possible types: "text", "recordio", "indexed_recordio"
251    *     - "text":
252    *         text file, each line is treated as a record
253    *         input split will split on '\\n' or '\\r'
254    *     - "recordio":
255    *         binary recordio file, see recordio.h
256    *     - "indexed_recordio":
257    *         binary recordio file with index, see recordio.h
258    * \return a new input split
259    * \sa InputSplit::Type
260    */
261   static InputSplit* Create(const char *uri,
262                             unsigned part_index,
263                             unsigned num_parts,
264                             const char *type);
265   /*!
266    * \brief factory function:
267    *  create input split given a uri for input and index
268    * \param uri the uri of the input, can contain hdfs prefix
269    * \param index_uri the uri of the index, can contain hdfs prefix
270    * \param part_index the part id of current input
271    * \param num_parts total number of splits
272    * \param type type of record
273    *   List of possible types: "text", "recordio", "indexed_recordio"
274    *     - "text":
275    *         text file, each line is treated as a record
276    *         input split will split on '\\n' or '\\r'
277    *     - "recordio":
278    *         binary recordio file, see recordio.h
279    *     - "indexed_recordio":
280    *         binary recordio file with index, see recordio.h
281    * \param shuffle whether to shuffle the output from the InputSplit,
282    *                supported only by "indexed_recordio" type.
283    *                Defaults to "false"
284    * \param seed random seed to use in conjunction with the "shuffle"
285    *             option. Defaults to 0
286    * \param batch_size a hint to InputSplit what is the intended number
287    *                   of examples return per batch. Used only by
288    *                   "indexed_recordio" type
289    * \param recurse_directories whether to recursively traverse directories
290    * \return a new input split
291    * \sa InputSplit::Type
292    */
293   static InputSplit* Create(const char *uri,
294                             const char *index_uri,
295                             unsigned part_index,
296                             unsigned num_parts,
297                             const char *type,
298                             const bool shuffle = false,
299                             const int seed = 0,
300                             const size_t batch_size = 256,
301                             const bool recurse_directories = false);
302 };
303 
304 #ifndef _LIBCPP_SGX_NO_IOSTREAMS
305 /*!
306  * \brief a std::ostream class that can can wrap Stream objects,
307  *  can use ostream with that output to underlying Stream
308  *
309  * Usage example:
310  * \code
311  *
312  *   Stream *fs = Stream::Create("hdfs:///test.txt", "w");
313  *   dmlc::ostream os(fs);
314  *   os << "hello world" << std::endl;
315  *   delete fs;
316  * \endcode
317  */
318 class ostream : public std::basic_ostream<char> {
319  public:
320   /*!
321    * \brief construct std::ostream type
322    * \param stream the Stream output to be used
323    * \param buffer_size internal streambuf size
324    */
325   explicit ostream(Stream *stream,
326                    size_t buffer_size = (1 << 10))
327       : std::basic_ostream<char>(NULL), buf_(buffer_size) {
328     this->set_stream(stream);
329   }
330   // explictly synchronize the buffer
~ostream()331   virtual ~ostream() DMLC_NO_EXCEPTION {
332     buf_.pubsync();
333   }
334   /*!
335    * \brief set internal stream to be stream, reset states
336    * \param stream new stream as output
337    */
set_stream(Stream * stream)338   inline void set_stream(Stream *stream) {
339     buf_.set_stream(stream);
340     this->rdbuf(&buf_);
341   }
342 
343   /*! \return how many bytes we written so far */
bytes_written(void)344   inline size_t bytes_written(void) const {
345     return buf_.bytes_out();
346   }
347 
348  private:
349   // internal streambuf
350   class OutBuf : public std::streambuf {
351    public:
OutBuf(size_t buffer_size)352     explicit OutBuf(size_t buffer_size)
353         : stream_(NULL), buffer_(buffer_size), bytes_out_(0) {
354       if (buffer_size == 0) buffer_.resize(2);
355     }
356     // set stream to the buffer
357     inline void set_stream(Stream *stream);
358 
bytes_out()359     inline size_t bytes_out() const { return bytes_out_; }
360    private:
361     /*! \brief internal stream by StreamBuf */
362     Stream *stream_;
363     /*! \brief internal buffer */
364     std::vector<char> buffer_;
365     /*! \brief number of bytes written so far */
366     size_t bytes_out_;
367     // override sync
368     inline int_type sync(void);
369     // override overflow
370     inline int_type overflow(int c);
371   };
372   /*! \brief buffer of the stream */
373   OutBuf buf_;
374 };
375 
376 /*!
377  * \brief a std::istream class that can can wrap Stream objects,
378  *  can use istream with that output to underlying Stream
379  *
380  * Usage example:
381  * \code
382  *
383  *   Stream *fs = Stream::Create("hdfs:///test.txt", "r");
384  *   dmlc::istream is(fs);
385  *   is >> mydata;
386  *   delete fs;
387  * \endcode
388  */
389 class istream : public std::basic_istream<char> {
390  public:
391   /*!
392    * \brief construct std::ostream type
393    * \param stream the Stream output to be used
394    * \param buffer_size internal buffer size
395    */
396   explicit istream(Stream *stream,
397                    size_t buffer_size = (1 << 10))
398       : std::basic_istream<char>(NULL), buf_(buffer_size) {
399     this->set_stream(stream);
400   }
~istream()401   virtual ~istream() DMLC_NO_EXCEPTION {}
402   /*!
403    * \brief set internal stream to be stream, reset states
404    * \param stream new stream as output
405    */
set_stream(Stream * stream)406   inline void set_stream(Stream *stream) {
407     buf_.set_stream(stream);
408     this->rdbuf(&buf_);
409   }
410   /*! \return how many bytes we read so far */
bytes_read(void)411   inline size_t bytes_read(void) const {
412     return buf_.bytes_read();
413   }
414 
415  private:
416   // internal streambuf
417   class InBuf : public std::streambuf {
418    public:
InBuf(size_t buffer_size)419     explicit InBuf(size_t buffer_size)
420         : stream_(NULL), bytes_read_(0),
421           buffer_(buffer_size) {
422       if (buffer_size == 0) buffer_.resize(2);
423     }
424     // set stream to the buffer
425     inline void set_stream(Stream *stream);
426     // return how many bytes read so far
bytes_read(void)427     inline size_t bytes_read(void) const {
428       return bytes_read_;
429     }
430    private:
431     /*! \brief internal stream by StreamBuf */
432     Stream *stream_;
433     /*! \brief how many bytes we read so far */
434     size_t bytes_read_;
435     /*! \brief internal buffer */
436     std::vector<char> buffer_;
437     // override underflow
438     inline int_type underflow();
439   };
440   /*! \brief input buffer */
441   InBuf buf_;
442 };
443 #endif
444 }  // namespace dmlc
445 
446 #include "./serializer.h"
447 
448 namespace dmlc {
449 // implementations of inline functions
450 template<typename T>
Write(const T & data)451 inline void Stream::Write(const T &data) {
452   serializer::Handler<T>::Write(this, data);
453 }
454 template<typename T>
Read(T * out_data)455 inline bool Stream::Read(T *out_data) {
456   return serializer::Handler<T>::Read(this, out_data);
457 }
458 
459 template<typename T>
WriteArray(const T * data,size_t num_elems)460 inline void Stream::WriteArray(const T* data, size_t num_elems) {
461   for (size_t i = 0; i < num_elems; ++i) {
462     this->Write<T>(data[i]);
463   }
464 }
465 
466 template<typename T>
ReadArray(T * data,size_t num_elems)467 inline bool Stream::ReadArray(T* data, size_t num_elems) {
468   for (size_t i = 0; i < num_elems; ++i) {
469     if (!this->Read<T>(data + i)) return false;
470   }
471   return true;
472 }
473 
474 #ifndef _LIBCPP_SGX_NO_IOSTREAMS
475 // implementations for ostream
set_stream(Stream * stream)476 inline void ostream::OutBuf::set_stream(Stream *stream) {
477   if (stream_ != NULL) this->pubsync();
478   this->stream_ = stream;
479   this->setp(&buffer_[0], &buffer_[0] + buffer_.size() - 1);
480 }
sync(void)481 inline int ostream::OutBuf::sync(void) {
482   if (stream_ == NULL) return -1;
483   std::ptrdiff_t n = pptr() - pbase();
484   stream_->Write(pbase(), n);
485   this->pbump(-static_cast<int>(n));
486   bytes_out_ += n;
487   return 0;
488 }
overflow(int c)489 inline int ostream::OutBuf::overflow(int c) {
490   *(this->pptr()) = c;
491   std::ptrdiff_t n = pptr() - pbase();
492   this->pbump(-static_cast<int>(n));
493   if (c == EOF) {
494     stream_->Write(pbase(), n);
495     bytes_out_ += n;
496   } else {
497     stream_->Write(pbase(), n + 1);
498     bytes_out_ += n + 1;
499   }
500   return c;
501 }
502 
503 // implementations for istream
set_stream(Stream * stream)504 inline void istream::InBuf::set_stream(Stream *stream) {
505   stream_ = stream;
506   this->setg(&buffer_[0], &buffer_[0], &buffer_[0]);
507 }
underflow()508 inline int istream::InBuf::underflow() {
509   char *bhead = &buffer_[0];
510   if (this->gptr() == this->egptr()) {
511     size_t sz = stream_->Read(bhead, buffer_.size());
512     this->setg(bhead, bhead, bhead + sz);
513     bytes_read_ += sz;
514   }
515   if (this->gptr() == this->egptr()) {
516     return traits_type::eof();
517   } else {
518     return traits_type::to_int_type(*gptr());
519   }
520 }
521 #endif
522 
523 namespace io {
524 /*! \brief common data structure for URI */
525 struct URI {
526   /*! \brief protocol */
527   std::string protocol;
528   /*!
529    * \brief host name, namenode for HDFS, bucket name for s3
530    */
531   std::string host;
532   /*! \brief name of the path */
533   std::string name;
534   /*! \brief enable default constructor */
URIURI535   URI(void) {}
536   /*!
537    * \brief construct from URI string
538    */
URIURI539   explicit URI(const char *uri) {
540     const char *p = std::strstr(uri, "://");
541     if (p == NULL) {
542       name = uri;
543     } else {
544       protocol = std::string(uri, p - uri + 3);
545       uri = p + 3;
546       p = std::strchr(uri, '/');
547       if (p == NULL) {
548         host = uri; name = '/';
549       } else {
550         host = std::string(uri, p - uri);
551         name = p;
552       }
553     }
554   }
555   /*! \brief string representation */
strURI556   inline std::string str(void) const {
557     return protocol + host + name;
558   }
559 };
560 
561 /*! \brief type of file */
562 enum FileType {
563   /*! \brief the file is file */
564   kFile,
565   /*! \brief the file is directory */
566   kDirectory
567 };
568 
569 /*! \brief use to store file information */
570 struct FileInfo {
571   /*! \brief full path to the file */
572   URI path;
573   /*! \brief the size of the file */
574   size_t size;
575   /*! \brief the type of the file */
576   FileType type;
577   /*! \brief default constructor */
FileInfoFileInfo578   FileInfo() : size(0), type(kFile) {}
579 };
580 
581 /*! \brief file system system interface */
582 class FileSystem {
583  public:
584   /*!
585    * \brief get singleton of filesystem instance according to URI
586    * \param path can be s3://..., hdfs://..., file://...,
587    *            empty string(will return local)
588    * \return a corresponding filesystem, report error if
589    *         we cannot find a matching system
590    */
591   static FileSystem *GetInstance(const URI &path);
592   /*! \brief virtual destructor */
~FileSystem()593   virtual ~FileSystem() {}
594   /*!
595    * \brief get information about a path
596    * \param path the path to the file
597    * \return the information about the file
598    */
599   virtual FileInfo GetPathInfo(const URI &path) = 0;
600   /*!
601    * \brief list files in a directory
602    * \param path to the file
603    * \param out_list the output information about the files
604    */
605   virtual void ListDirectory(const URI &path, std::vector<FileInfo> *out_list) = 0;
606   /*!
607    * \brief list files in a directory recursively using ListDirectory
608    * \param path to the file
609    * \param out_list the output information about the files
610    */
611   virtual void ListDirectoryRecursive(const URI &path,
612                                       std::vector<FileInfo> *out_list);
613   /*!
614    * \brief open a stream
615    * \param path path to file
616    * \param flag can be "w", "r", "a
617    * \param allow_null whether NULL can be returned, or directly report error
618    * \return the created stream, can be NULL when allow_null == true and file do not exist
619    */
620   virtual Stream *Open(const URI &path,
621                        const char* const flag,
622                        bool allow_null = false) = 0;
623   /*!
624    * \brief open a seekable stream for read
625    * \param path the path to the file
626    * \param allow_null whether NULL can be returned, or directly report error
627    * \return the created stream, can be NULL when allow_null == true and file do not exist
628    */
629   virtual SeekStream *OpenForRead(const URI &path,
630                                   bool allow_null = false) = 0;
631 };
632 
633 }  // namespace io
634 }  // namespace dmlc
635 #endif  // DMLC_IO_H_
636