1 /**************************************************************************
2 *
3 * Copyright 2015 VMware, Inc
4 * Copyright 2011 Zack Rusin
5 * All Rights Reserved.
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 * THE SOFTWARE.
24 *
25 **************************************************************************/
26
27
28 #include "trace_ostream.hpp"
29
30 #include <fstream>
31
32 #include <assert.h>
33 #include <string.h>
34
35 #include <snappy.h>
36
37 #include "os.hpp"
38 #include "trace_snappy.hpp"
39
40
41 #define SNAPPY_CHUNK_SIZE (1 * 1024 * 1024)
42
43
44 using namespace trace;
45
46
47 class SnappyOutStream : public OutStream {
48 public:
49 SnappyOutStream(const char *filename);
50 ~SnappyOutStream();
51
52 SnappyOutStream(void);
53 bool write(const void *buffer, size_t length) override;
54 void flush(void) override;
isOpen(void)55 bool isOpen(void) {
56 return m_stream.is_open();
57 }
58
59
60 private:
61 void close(void);
62
usedCacheSize(void) const63 inline size_t usedCacheSize(void) const
64 {
65 assert(m_cachePtr >= m_cache);
66 return m_cachePtr - m_cache;
67 }
freeCacheSize(void) const68 inline size_t freeCacheSize(void) const
69 {
70 assert(m_cacheSize >= usedCacheSize());
71 if (m_cacheSize > 0) {
72 return m_cacheSize - usedCacheSize();
73 } else {
74 return 0;
75 }
76 }
endOfData(void) const77 inline bool endOfData(void) const
78 {
79 return m_stream.eof() && freeCacheSize() == 0;
80 }
81 void flushWriteCache(void);
82 void createCache(size_t size);
83 void writeCompressedLength(size_t length);
84 private:
85 std::ofstream m_stream;
86 size_t m_cacheMaxSize;
87 size_t m_cacheSize;
88 char *m_cache;
89 char *m_cachePtr;
90
91 char *m_compressedCache;
92 };
93
SnappyOutStream(const char * filename)94 SnappyOutStream::SnappyOutStream(const char *filename)
95 : m_cacheMaxSize(SNAPPY_CHUNK_SIZE),
96 m_cacheSize(m_cacheMaxSize),
97 m_cache(new char [m_cacheMaxSize]),
98 m_cachePtr(m_cache)
99 {
100 size_t maxCompressedLength =
101 snappy::MaxCompressedLength(SNAPPY_CHUNK_SIZE);
102 m_compressedCache = new char[maxCompressedLength];
103
104 std::ios_base::openmode fmode = std::fstream::binary
105 | std::fstream::out
106 | std::fstream::trunc;
107 m_stream.open(filename, fmode);
108 if (m_stream.is_open()) {
109 m_stream << SNAPPY_BYTE1;
110 m_stream << SNAPPY_BYTE2;
111 m_stream.flush();
112 }
113 }
114
~SnappyOutStream()115 SnappyOutStream::~SnappyOutStream()
116 {
117 close();
118 delete [] m_compressedCache;
119 delete [] m_cache;
120 }
121
write(const void * buffer,size_t length)122 bool SnappyOutStream::write(const void *buffer, size_t length)
123 {
124 if (freeCacheSize() > length) {
125 memcpy(m_cachePtr, buffer, length);
126 m_cachePtr += length;
127 } else if (freeCacheSize() == length) {
128 memcpy(m_cachePtr, buffer, length);
129 m_cachePtr += length;
130 flushWriteCache();
131 } else {
132 size_t sizeToWrite = length;
133
134 while (sizeToWrite >= freeCacheSize()) {
135 size_t endSize = freeCacheSize();
136 size_t offset = length - sizeToWrite;
137 memcpy(m_cachePtr, (const char*)buffer + offset, endSize);
138 sizeToWrite -= endSize;
139 m_cachePtr += endSize;
140 flushWriteCache();
141 }
142 if (sizeToWrite) {
143 size_t offset = length - sizeToWrite;
144 memcpy(m_cachePtr, (const char*)buffer + offset, sizeToWrite);
145 m_cachePtr += sizeToWrite;
146 }
147 }
148
149 return true;
150 }
151
close(void)152 void SnappyOutStream::close(void)
153 {
154 flushWriteCache();
155 m_stream.close();
156 delete [] m_cache;
157 m_cache = NULL;
158 m_cachePtr = NULL;
159 }
160
flush(void)161 void SnappyOutStream::flush(void)
162 {
163 flushWriteCache();
164 m_stream.flush();
165 }
166
flushWriteCache(void)167 void SnappyOutStream::flushWriteCache(void)
168 {
169 size_t inputLength = usedCacheSize();
170
171 if (inputLength) {
172 size_t compressedLength;
173
174 ::snappy::RawCompress(m_cache, inputLength,
175 m_compressedCache, &compressedLength);
176
177 writeCompressedLength(compressedLength);
178 m_stream.write(m_compressedCache, compressedLength);
179 m_cachePtr = m_cache;
180 }
181 assert(m_cachePtr == m_cache);
182 }
183
writeCompressedLength(size_t length)184 void SnappyOutStream::writeCompressedLength(size_t length)
185 {
186 unsigned char buf[4];
187 buf[0] = length & 0xff; length >>= 8;
188 buf[1] = length & 0xff; length >>= 8;
189 buf[2] = length & 0xff; length >>= 8;
190 buf[3] = length & 0xff; length >>= 8;
191 assert(length == 0);
192 m_stream.write((const char *)buf, sizeof buf);
193 }
194
195
196 OutStream *
createSnappyStream(const char * filename)197 trace::createSnappyStream(const char *filename)
198 {
199 SnappyOutStream *outStream = new SnappyOutStream(filename);
200 if (!outStream->isOpen()) {
201 os::log("error: could not open %s for writing\n", filename);
202 delete outStream;
203 outStream = nullptr;
204 }
205
206 return outStream;
207 }
208