1
2 /* Web Polygraph http://www.web-polygraph.org/
3 * Copyright 2003-2011 The Measurement Factory
4 * Licensed under the Apache License, Version 2.0 */
5
6 #include "base/polygraph.h"
7
8 #ifdef HAVE_UNISTD_H
9 #include "xstd/h/os_std.h"
10 #endif
11
12 #include "xstd/h/string.h"
13
14 #include "base/OLog.h"
15 #include "xstd/gadgets.h"
16 #include "xstd/ZFStream.h"
17
18
OLog()19 OLog::OLog(): theStream(0), theZStream(0), theEntry(0), theEntryTag(-1) {
20 theCapacity = Size::KB(64); // default
21 theBuf = new char[theCapacity];
22 theSize = 0;
23 thePos = 0;
24 }
25
~OLog()26 OLog::~OLog() {
27 if (theStream)
28 close();
29 delete[] theBuf;
30 }
31
stream(const String & aName,ostream * aStream)32 void OLog::stream(const String &aName, ostream *aStream) {
33 Assert(!theStream && aStream);
34 theName = aName;
35 theStream = aStream;
36
37 // make theStream unbuffered
38 theStream->flush();
39 theStream->rdbuf()->pubsetbuf(0, 0);
40
41 putHeader();
42 }
43
capacity(Size aCap)44 void OLog::capacity(Size aCap) {
45 Assert(theCapacity);
46 Assert(aCap >= theCapacity); // cannot shrink
47
48 if (aCap > theCapacity) {
49 theCapacity = aCap;
50
51 // allocate new chunk and copy
52 char *buf = new char[theCapacity];
53 if (theSize)
54 memcpy(buf, theBuf, theSize);
55 if (theEntry)
56 theEntry = buf + (theEntry - theBuf);
57 delete[] theBuf;
58 theBuf = buf;
59 }
60 }
61
resize(Size minCap)62 void OLog::resize(Size minCap) {
63 Assert(minCap >= theCapacity); // cannot shrink
64
65 // exponential growth
66 Size newCap = theCapacity;
67 while (newCap < minCap)
68 newCap *= 2;
69
70 capacity(newCap);
71 }
72
close()73 void OLog::close() {
74 if (theStream) {
75
76 // abort unfinished entry, if any
77 if (theEntry) {
78 const Size readySize = (Size)(theEntry - theBuf);
79 thePos -= theSize - readySize;
80 theSize = readySize;
81 theEntry = 0;
82 }
83
84 putTrailer();
85 flush();
86 delete theZStream;
87 theZStream = 0;
88 delete theStream;
89 theStream = 0;
90 }
91 }
92
flush(Size maxSize)93 void OLog::flush(Size maxSize) {
94 if (!theStream)
95 return;
96
97 // do not flush current entry if any; its size is yet unknown
98 const Size readySz = Min(maxSize,
99 theEntry ? (Size)(theEntry - theBuf) : theSize);
100 if (readySz > 0) {
101 write(theBuf, readySz);
102 if (theSize > readySz) {
103 // move leftovers to the beginning of a buffer
104 theSize -= readySz;
105 memmove(theBuf, theBuf + readySz, theSize);
106 } else {
107 Assert(theSize == readySz); // wrote everything
108 theSize = 0;
109 }
110 if (theEntry)
111 theEntry -= readySz;
112 }
113 }
114
write(const char * const buf,const Size size)115 void OLog::write(const char *const buf, const Size size) {
116 Must(theStream);
117 if (theZStream)
118 Should(theZStream->write(buf, size));
119 else
120 Should(theStream->write(buf, size));
121 }
122
overflow(const void * buf,Size size)123 void OLog::overflow(const void *buf, Size size) {
124 // buffer what fits
125 int space = theCapacity - theSize;
126 if (space > 0) {
127 put(buf, space);
128 buf = ((const char*)buf) + space;
129 size -= space;
130 }
131
132 flush();
133
134 // grow if does not fit
135 space = theCapacity - theSize;
136 if (space < size)
137 resize(theSize + size);
138
139 Assert(theSize + size <= theCapacity);
140 put(buf, size);
141 }
142
begEntry(int tag)143 void OLog::begEntry(int tag) {
144 Assert(tag > 0);
145 Assert(!theEntry);
146
147 theEntry = theBuf + theSize;
148 theEntryTag = tag;
149 puti(0); // size placeholder
150 puti(tag);
151 }
152
endEntry()153 void OLog::endEntry() {
154 Assert(theEntry);
155 Assert(theEntryTag > 0);
156 const int x = htonl((theBuf + theSize) - theEntry);
157 memcpy(theEntry, &x, sizeof(x)); // record actual entry size
158
159 // update directory if needed
160 if (theDir.count() > theEntryTag) { // old tag
161 if (!theDir[theEntryTag])
162 theDir[theEntryTag] = thePos;
163 else
164 if (theDir[theEntryTag] > 0)
165 theDir[theEntryTag] = -theDir[theEntryTag];
166 } else {
167 theDir.put(thePos, theEntryTag);
168 }
169
170 theEntryTag = -1;
171 theEntry = 0;
172 }
173
putHeader()174 void OLog::putHeader() {
175 puti(27); // current version
176 puti(27); // required min version to support
177 bool doCompression = false;
178 if (zlib::Supported) {
179 const int zlibHdrSz = 32;
180 char zlibHdr[zlibHdrSz];
181 memset(zlibHdr, 0, zlibHdrSz);
182 strcpy(zlibHdr, "zlib");
183 puti(zlibHdrSz); // size of extra headers
184 put(zlibHdr, zlibHdrSz);
185 doCompression = true;
186 } else
187 puti(0); // size of extra headers
188
189 if (doCompression) {
190 flush();
191 theZStream = new zlib::OFStream(*theStream);
192 }
193 }
194
putTrailer()195 void OLog::putTrailer() {
196 puti(0); // mark end-of-log
197 puti(0); // reserved
198 const Size dirPos = thePos;
199 (*this) << theDir;
200 puti(thePos - dirPos); // dir size
201 }
202
puti(const int * xs,int count)203 void OLog::puti(const int *xs, int count) {
204 puti(count);
205 for (int i = 0; i < count; ++i)
206 puti(xs[i]);
207 }
208