1 /**************************************************************************
2  *
3  * Copyright 2015 VMware, Inc
4  * Copyright 2011 Zack Rusin
5  * All Rights Reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  *
25  **************************************************************************/
26 
27 
28 #include "trace_ostream.hpp"
29 
30 #include <fstream>
31 
32 #include <assert.h>
33 #include <string.h>
34 
35 #include <snappy.h>
36 
37 #include "os.hpp"
38 #include "trace_snappy.hpp"
39 
40 
41 #define SNAPPY_CHUNK_SIZE (1 * 1024 * 1024)
42 
43 
44 using namespace trace;
45 
46 
47 class SnappyOutStream : public OutStream {
48 public:
49     SnappyOutStream(const char *filename);
50     ~SnappyOutStream();
51 
52     SnappyOutStream(void);
53     bool write(const void *buffer, size_t length) override;
54     void flush(void) override;
isOpen(void)55     bool isOpen(void) {
56         return m_stream.is_open();
57     }
58 
59 
60 private:
61     void close(void);
62 
usedCacheSize(void) const63     inline size_t usedCacheSize(void) const
64     {
65         assert(m_cachePtr >= m_cache);
66         return m_cachePtr - m_cache;
67     }
freeCacheSize(void) const68     inline size_t freeCacheSize(void) const
69     {
70         assert(m_cacheSize >= usedCacheSize());
71         if (m_cacheSize > 0) {
72             return m_cacheSize - usedCacheSize();
73         } else {
74             return 0;
75         }
76     }
endOfData(void) const77     inline bool endOfData(void) const
78     {
79         return m_stream.eof() && freeCacheSize() == 0;
80     }
81     void flushWriteCache(void);
82     void createCache(size_t size);
83     void writeCompressedLength(size_t length);
84 private:
85     std::ofstream m_stream;
86     size_t m_cacheMaxSize;
87     size_t m_cacheSize;
88     char *m_cache;
89     char *m_cachePtr;
90 
91     char *m_compressedCache;
92 };
93 
SnappyOutStream(const char * filename)94 SnappyOutStream::SnappyOutStream(const char *filename)
95     : m_cacheMaxSize(SNAPPY_CHUNK_SIZE),
96       m_cacheSize(m_cacheMaxSize),
97       m_cache(new char [m_cacheMaxSize]),
98       m_cachePtr(m_cache)
99 {
100     size_t maxCompressedLength =
101         snappy::MaxCompressedLength(SNAPPY_CHUNK_SIZE);
102     m_compressedCache = new char[maxCompressedLength];
103 
104     std::ios_base::openmode fmode = std::fstream::binary
105                                   | std::fstream::out
106                                   | std::fstream::trunc;
107     m_stream.open(filename, fmode);
108     if (m_stream.is_open()) {
109         m_stream << SNAPPY_BYTE1;
110         m_stream << SNAPPY_BYTE2;
111         m_stream.flush();
112     }
113 }
114 
~SnappyOutStream()115 SnappyOutStream::~SnappyOutStream()
116 {
117     close();
118     delete [] m_compressedCache;
119     delete [] m_cache;
120 }
121 
write(const void * buffer,size_t length)122 bool SnappyOutStream::write(const void *buffer, size_t length)
123 {
124     if (freeCacheSize() > length) {
125         memcpy(m_cachePtr, buffer, length);
126         m_cachePtr += length;
127     } else if (freeCacheSize() == length) {
128         memcpy(m_cachePtr, buffer, length);
129         m_cachePtr += length;
130         flushWriteCache();
131     } else {
132         size_t sizeToWrite = length;
133 
134         while (sizeToWrite >= freeCacheSize()) {
135             size_t endSize = freeCacheSize();
136             size_t offset = length - sizeToWrite;
137             memcpy(m_cachePtr, (const char*)buffer + offset, endSize);
138             sizeToWrite -= endSize;
139             m_cachePtr += endSize;
140             flushWriteCache();
141         }
142         if (sizeToWrite) {
143             size_t offset = length - sizeToWrite;
144             memcpy(m_cachePtr, (const char*)buffer + offset, sizeToWrite);
145             m_cachePtr += sizeToWrite;
146         }
147     }
148 
149     return true;
150 }
151 
close(void)152 void SnappyOutStream::close(void)
153 {
154     flushWriteCache();
155     m_stream.close();
156     delete [] m_cache;
157     m_cache = NULL;
158     m_cachePtr = NULL;
159 }
160 
flush(void)161 void SnappyOutStream::flush(void)
162 {
163     flushWriteCache();
164     m_stream.flush();
165 }
166 
flushWriteCache(void)167 void SnappyOutStream::flushWriteCache(void)
168 {
169     size_t inputLength = usedCacheSize();
170 
171     if (inputLength) {
172         size_t compressedLength;
173 
174         ::snappy::RawCompress(m_cache, inputLength,
175                               m_compressedCache, &compressedLength);
176 
177         writeCompressedLength(compressedLength);
178         m_stream.write(m_compressedCache, compressedLength);
179         m_cachePtr = m_cache;
180     }
181     assert(m_cachePtr == m_cache);
182 }
183 
writeCompressedLength(size_t length)184 void SnappyOutStream::writeCompressedLength(size_t length)
185 {
186     unsigned char buf[4];
187     buf[0] = length & 0xff; length >>= 8;
188     buf[1] = length & 0xff; length >>= 8;
189     buf[2] = length & 0xff; length >>= 8;
190     buf[3] = length & 0xff; length >>= 8;
191     assert(length == 0);
192     m_stream.write((const char *)buf, sizeof buf);
193 }
194 
195 
196 OutStream *
createSnappyStream(const char * filename)197 trace::createSnappyStream(const char *filename)
198 {
199     SnappyOutStream *outStream = new SnappyOutStream(filename);
200     if (!outStream->isOpen()) {
201         os::log("error: could not open %s for writing\n", filename);
202         delete outStream;
203         outStream = nullptr;
204     }
205 
206     return outStream;
207 }
208