1 
2 /* Web Polygraph       http://www.web-polygraph.org/
3  * Copyright 2003-2011 The Measurement Factory
4  * Licensed under the Apache License, Version 2.0 */
5 
6 #include "base/polygraph.h"
7 
8 #ifdef HAVE_UNISTD_H
9 #include "xstd/h/os_std.h"
10 #endif
11 
12 #include "xstd/h/string.h"
13 
14 #include "base/OLog.h"
15 #include "xstd/gadgets.h"
16 #include "xstd/ZFStream.h"
17 
18 
OLog()19 OLog::OLog(): theStream(0), theZStream(0), theEntry(0), theEntryTag(-1) {
20 	theCapacity = Size::KB(64); // default
21 	theBuf = new char[theCapacity];
22 	theSize = 0;
23 	thePos = 0;
24 }
25 
~OLog()26 OLog::~OLog() {
27 	if (theStream)
28 		close();
29 	delete[] theBuf;
30 }
31 
stream(const String & aName,ostream * aStream)32 void OLog::stream(const String &aName, ostream *aStream) {
33 	Assert(!theStream && aStream);
34 	theName = aName;
35 	theStream = aStream;
36 
37 	// make theStream unbuffered
38 	theStream->flush();
39 	theStream->rdbuf()->pubsetbuf(0, 0);
40 
41 	putHeader();
42 }
43 
capacity(Size aCap)44 void OLog::capacity(Size aCap) {
45 	Assert(theCapacity);
46 	Assert(aCap >= theCapacity); // cannot shrink
47 
48 	if (aCap > theCapacity) {
49 		theCapacity = aCap;
50 
51 		// allocate new chunk and copy
52 		char *buf = new char[theCapacity];
53 		if (theSize)
54 			memcpy(buf, theBuf, theSize);
55 		if (theEntry)
56 			theEntry = buf + (theEntry - theBuf);
57 		delete[] theBuf;
58 		theBuf = buf;
59 	}
60 }
61 
resize(Size minCap)62 void OLog::resize(Size minCap) {
63 	Assert(minCap >= theCapacity); // cannot shrink
64 
65 	// exponential growth
66 	Size newCap = theCapacity;
67 	while (newCap < minCap)
68 		newCap *= 2;
69 
70 	capacity(newCap);
71 }
72 
close()73 void OLog::close() {
74 	if (theStream) {
75 
76 		// abort unfinished entry, if any
77 		if (theEntry) {
78 			const Size readySize = (Size)(theEntry - theBuf);
79 			thePos -= theSize - readySize;
80 			theSize = readySize;
81 			theEntry = 0;
82 		}
83 
84 		putTrailer();
85 		flush();
86 		delete theZStream;
87 		theZStream = 0;
88 		delete theStream;
89 		theStream = 0;
90 	}
91 }
92 
flush(Size maxSize)93 void OLog::flush(Size maxSize) {
94 	if (!theStream)
95 		return;
96 
97 	// do not flush current entry if any; its size is yet unknown
98 	const Size readySz = Min(maxSize,
99 		theEntry ? (Size)(theEntry - theBuf) : theSize);
100 	if (readySz > 0) {
101 		write(theBuf, readySz);
102 		if (theSize > readySz) {
103 			// move leftovers to the beginning of a buffer
104 			theSize -= readySz;
105 			memmove(theBuf, theBuf + readySz, theSize);
106 		} else {
107 			Assert(theSize == readySz); // wrote everything
108 			theSize = 0;
109 		}
110 		if (theEntry)
111 			theEntry -= readySz;
112 	}
113 }
114 
write(const char * const buf,const Size size)115 void OLog::write(const char *const buf, const Size size) {
116 	Must(theStream);
117 	if (theZStream)
118 		Should(theZStream->write(buf, size));
119 	else
120 		Should(theStream->write(buf, size));
121 }
122 
overflow(const void * buf,Size size)123 void OLog::overflow(const void *buf, Size size) {
124 	// buffer what fits
125 	int space = theCapacity - theSize;
126 	if (space > 0) {
127 		put(buf, space);
128 		buf = ((const char*)buf) + space;
129 		size -= space;
130 	}
131 
132 	flush();
133 
134 	// grow if does not fit
135 	space = theCapacity - theSize;
136 	if (space < size)
137 		resize(theSize + size);
138 
139 	Assert(theSize + size <= theCapacity);
140 	put(buf, size);
141 }
142 
begEntry(int tag)143 void OLog::begEntry(int tag) {
144 	Assert(tag > 0);
145 	Assert(!theEntry);
146 
147 	theEntry = theBuf + theSize;
148 	theEntryTag = tag;
149 	puti(0);		// size placeholder
150 	puti(tag);
151 }
152 
endEntry()153 void OLog::endEntry() {
154 	Assert(theEntry);
155 	Assert(theEntryTag > 0);
156 	const int x = htonl((theBuf + theSize) - theEntry);
157 	memcpy(theEntry, &x, sizeof(x)); // record actual entry size
158 
159 	// update directory if needed
160 	if (theDir.count() > theEntryTag) { // old tag
161 		if (!theDir[theEntryTag])
162 			theDir[theEntryTag] = thePos;
163 		else
164 		if (theDir[theEntryTag] > 0)
165 			theDir[theEntryTag] = -theDir[theEntryTag];
166 	} else {
167 		theDir.put(thePos, theEntryTag);
168 	}
169 
170 	theEntryTag = -1;
171 	theEntry = 0;
172 }
173 
putHeader()174 void OLog::putHeader() {
175 	puti(27); // current version
176 	puti(27); // required min version to support
177 	bool doCompression = false;
178 	if (zlib::Supported) {
179 		const int zlibHdrSz = 32;
180 		char zlibHdr[zlibHdrSz];
181 		memset(zlibHdr, 0, zlibHdrSz);
182 		strcpy(zlibHdr, "zlib");
183 		puti(zlibHdrSz); // size of extra headers
184 		put(zlibHdr, zlibHdrSz);
185 		doCompression = true;
186 	} else
187 		puti(0); // size of extra headers
188 
189 	if (doCompression) {
190 		flush();
191 		theZStream = new zlib::OFStream(*theStream);
192 	}
193 }
194 
putTrailer()195 void OLog::putTrailer() {
196 	puti(0); // mark end-of-log
197 	puti(0); // reserved
198 	const Size dirPos = thePos;
199 	(*this) << theDir;
200 	puti(thePos - dirPos); // dir size
201 }
202 
puti(const int * xs,int count)203 void OLog::puti(const int *xs, int count) {
204 	puti(count);
205 	for (int i = 0; i < count; ++i)
206 		puti(xs[i]);
207 }
208