1 /*-
2  * Copyright (c) 2005-2008 Poul-Henning Kamp
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD$
27  */
28 
29 #include <assert.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <stdlib.h>
33 #include <unistd.h>
34 #include <time.h>
35 #include <sys/endian.h>
36 
37 #include <zlib.h>
38 
39 #include "fifolog.h"
40 #include "libfifolog.h"
41 #include "libfifolog_int.h"
42 #include "fifolog_write.h"
43 #include "miniobj.h"
44 
45 #define ALLOC(ptr, size) do {                   \
46 	(*(ptr)) = calloc(size, 1);             \
47 	assert(*(ptr) != NULL);                 \
48 } while (0)
49 
50 
51 const char *fifolog_write_statnames[] = {
52 [FIFOLOG_PT_BYTES_PRE] =	"Bytes before compression",
53 [FIFOLOG_PT_BYTES_POST] =	"Bytes after compression",
54 [FIFOLOG_PT_WRITES] =		"Writes",
55 [FIFOLOG_PT_FLUSH] =		"Flushes",
56 [FIFOLOG_PT_SYNC] =		"Syncs",
57 [FIFOLOG_PT_RUNTIME] =		"Runtime"
58 };
59 
60 /*
61  * Check that everything is all right
62  */
63 static void
64 fifolog_write_assert(const struct fifolog_writer *f)
65 {
66 
67 	CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
68 	assert(f->iptr == f->ff->zs->next_in + f->ff->zs->avail_in);
69 	assert(f->ff->zs->next_out + f->ff->zs->avail_out == \
70 	    f->ff->recbuf + f->ff->recsize);
71 }
72 
73 struct fifolog_writer *
74 fifolog_write_new(void)
75 {
76 	struct fifolog_writer *f;
77 
78 	ALLOC(&f, sizeof *f);
79 	f->magic = FIFOLOG_WRITER_MAGIC;
80 	return (f);
81 }
82 
83 void
84 fifolog_write_destroy(struct fifolog_writer *f)
85 {
86 	CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
87 	free(f);
88 }
89 
90 void
91 fifolog_write_close(struct fifolog_writer *f)
92 {
93 
94 	CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
95 	fifolog_int_close(&f->ff);
96 	free(f->ff);
97 	if (f->ibuf != NULL)
98 		free(f->ibuf);
99 	free(f);
100 }
101 
102 static void
103 fifo_prepobuf(struct fifolog_writer *f, time_t now, int flag)
104 {
105 
106 	memset(f->ff->recbuf, 0, f->ff->recsize);
107 	f->ff->zs->next_out = f->ff->recbuf + 5;
108 	f->ff->zs->avail_out = f->ff->recsize - 5;
109 	if (f->recno == 0 && f->seq == 0) {
110 		srandomdev();
111 		do {
112 			f->seq = random();
113 		} while (f->seq == 0);
114 	}
115 	be32enc(f->ff->recbuf, f->seq++);
116 	f->ff->recbuf[4] = f->flag;
117 	f->flag = 0;
118 	if (flag) {
119 		f->ff->recbuf[4] |= FIFOLOG_FLG_SYNC;
120 		be32enc(f->ff->recbuf + 5, (u_int)now);
121 		f->ff->zs->next_out += 4;
122 		f->ff->zs->avail_out -= 4;
123 	}
124 	fifolog_write_assert(f);
125 }
126 
127 const char *
128 fifolog_write_open(struct fifolog_writer *f, const char *fn, unsigned writerate, unsigned syncrate, int compression)
129 {
130 	const char *es;
131 	int i;
132 	time_t now;
133 	off_t o;
134 
135 	CHECK_OBJ_NOTNULL(f, FIFOLOG_WRITER_MAGIC);
136 
137 	/* Check for legal compression value */
138 	if (compression < Z_DEFAULT_COMPRESSION ||
139 	    compression > Z_BEST_COMPRESSION)
140 		return ("Illegal compression value");
141 
142 	f->writerate = writerate;
143 	f->syncrate = syncrate;
144 	f->compression = compression;
145 
146 	/* Reset statistics */
147 	memset(f->cnt, 0, sizeof f->cnt);
148 
149 	es = fifolog_int_open(&f->ff, fn, 1);
150 	if (es != NULL)
151 		return (es);
152 	es = fifolog_int_findend(f->ff, &o);
153 	if (es != NULL)
154 		return (es);
155 	i = fifolog_int_read(f->ff, o);
156 	if (i)
157 		return ("Read error, looking for seq");
158 	f->seq = be32dec(f->ff->recbuf);
159 	if (f->seq == 0) {
160 		/* Empty fifolog */
161 		f->seq = random();
162 	} else {
163 		f->recno = o + 1;
164 		f->seq++;
165 	}
166 
167 	f->ibufsize = 32768;
168 	ALLOC(&f->ibuf, f->ibufsize);
169 	f->iptr = f->ibuf;
170 	f->ff->zs->next_in = f->iptr;
171 	i = deflateInit(f->ff->zs, (int)f->compression);
172 	assert(i == Z_OK);
173 
174 	f->flag |= FIFOLOG_FLG_RESTART;
175 
176 	time(&now);
177 	fifo_prepobuf(f, now, 1);
178 	f->starttime = now;
179 
180 	fifolog_write_assert(f);
181 	return (NULL);
182 }
183 
184 static void
185 fifo_writerec(struct fifolog_writer *f)
186 {
187 	int i;
188 	time_t t;
189 
190 	fifolog_write_assert(f);
191 	f->writes_since_sync++;
192 
193 	assert(f->recno < f->ff->logsize);
194 	f->cnt[FIFOLOG_PT_BYTES_POST] += f->ff->recsize - f->ff->zs->avail_out;
195 	if (f->ff->zs->avail_out == 0) {
196 		/* nothing */
197 	} else if (f->ff->zs->avail_out <= 255) {
198 		f->ff->recbuf[f->ff->recsize - 1] =
199 		    (u_char)f->ff->zs->avail_out;
200 		f->ff->recbuf[4] |= FIFOLOG_FLG_1BYTE;
201 	} else {
202 		be32enc(f->ff->recbuf + f->ff->recsize - 4,
203 		    f->ff->zs->avail_out);
204 		f->ff->recbuf[4] |= FIFOLOG_FLG_4BYTE;
205 	}
206 	i = pwrite(f->ff->fd, f->ff->recbuf, f->ff->recsize,
207 		(f->recno + 1) * f->ff->recsize);
208 	assert (i == (int)f->ff->recsize);
209 	if (++f->recno == f->ff->logsize)
210 		f->recno = 0;
211 	f->cnt[FIFOLOG_PT_WRITES]++;
212 	time(&t);
213 	f->cnt[FIFOLOG_PT_RUNTIME] = t - f->starttime; /*lint !e776 */
214 	fifolog_write_assert(f);
215 }
216 
217 int
218 fifolog_write_poll(struct fifolog_writer *f, time_t now)
219 {
220 	int i, fl, bo, bf;
221 
222 	if (now == 0)
223 		time(&now);
224 
225 	fifolog_write_assert(f);
226 	if (f->cleanup || now >= (int)(f->lastsync + f->syncrate)) {
227 		/*
228 		 * We always check the sync timer, otherwise a flood of data
229 		 * would not get any sync records at all
230 		 */
231 		f->cleanup = 0;
232 		fl = Z_FINISH;
233 		f->lastsync = now;
234 		f->lastwrite = now;
235 		f->cnt[FIFOLOG_PT_SYNC]++;
236 	} else if (f->ff->zs->avail_in == 0 &&
237 	    now >= (int)(f->lastwrite + f->writerate)) {
238 		/*
239 		 * We only check for writerate timeouts when the input
240 		 * buffer is empty.  It would be silly to force a write if
241 		 * pending input could cause it to happen on its own.
242 		 */
243 		fl = Z_SYNC_FLUSH;
244 		f->lastwrite = now;
245 		f->cnt[FIFOLOG_PT_FLUSH]++;
246 	} else if (f->ff->zs->avail_in == 0)
247 		return (0);			/* nothing to do */
248 	else
249 		fl = Z_NO_FLUSH;
250 
251 	for (;;) {
252 		assert(f->ff->zs->avail_out > 0);
253 
254 		bf = f->ff->zs->avail_out;
255 
256 		i = deflate(f->ff->zs, fl);
257 		assert (i == Z_OK || i == Z_BUF_ERROR || i == Z_STREAM_END);
258 
259 		bo = f->ff->zs->avail_out;
260 
261 		/* If we have output space and not in a hurry.. */
262 		if (bo > 0 && fl == Z_NO_FLUSH)
263 			break;
264 
265 		/* Write output buffer, if anything in it */
266 		if (bo != bf)
267 			fifo_writerec(f);
268 
269 		/* If the buffer were full, we need to check again */
270 		if (bo == 0) {
271 			fifo_prepobuf(f, now, 0);
272 			continue;
273 		}
274 
275 		if (fl == Z_FINISH) {
276 			/* Make next record a SYNC record */
277 			fifo_prepobuf(f, now, 1);
278 			/* And reset the zlib engine */
279 			i = deflateReset(f->ff->zs);
280 			assert(i == Z_OK);
281 			f->writes_since_sync = 0;
282 		} else {
283 			fifo_prepobuf(f, now, 0);
284 		}
285 		break;
286 	}
287 
288 	if (f->ff->zs->avail_in == 0) {
289 		/* Reset input buffer when empty */
290 		f->iptr = f->ibuf;
291 		f->ff->zs->next_in = f->iptr;
292 	}
293 
294 	fifolog_write_assert(f);
295 	return (1);
296 }
297 
298 static void
299 fifolog_acct(struct fifolog_writer *f, unsigned bytes)
300 {
301 
302 	f->ff->zs->avail_in += bytes;
303 	f->iptr += bytes;
304 	f->cnt[FIFOLOG_PT_BYTES_PRE] += bytes;
305 }
306 
307 /*
308  * Attempt to write an entry.
309  * Return zero if there is no space, one otherwise
310  */
311 
312 int
313 fifolog_write_bytes(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len)
314 {
315 	u_int l;
316 	const unsigned char *p;
317 
318 	fifolog_write_assert(f);
319 	assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH)));
320 	assert(ptr != NULL);
321 
322 	p = ptr;
323 	if (len == 0) {
324 		len = strlen(ptr) + 1;
325 		l = 4 + len;		/* id */
326 	} else {
327 		assert(len <= 255);
328 		id |= FIFOLOG_LENGTH;
329 		l = 5 + len;		/* id + len */
330 	}
331 
332 	l += 4; 		/* A timestamp may be necessary */
333 
334 	/* Now do timestamp, if needed */
335 	if (now == 0)
336 		time(&now);
337 
338 	assert(l < f->ibufsize);
339 
340 	/* Return if there is not enough space */
341 	if (f->iptr + l > f->ibuf + f->ibufsize)
342 		return (0);
343 
344 	if (now != f->last) {
345 		id |= FIFOLOG_TIMESTAMP;
346 		f->last = now;
347 	}
348 
349 	/* Emit instance+flag and length */
350 	be32enc(f->iptr, id);
351 	fifolog_acct(f, 4);
352 
353 	if (id & FIFOLOG_TIMESTAMP) {
354 		be32enc(f->iptr, (uint32_t)f->last);
355 		fifolog_acct(f, 4);
356 	}
357 	if (id & FIFOLOG_LENGTH) {
358 		f->iptr[0] = (u_char)len;
359 		fifolog_acct(f, 1);
360 	}
361 
362 	assert (len > 0);
363 	memcpy(f->iptr, p, len);
364 	fifolog_acct(f, len);
365 	fifolog_write_assert(f);
366 	return (1);
367 }
368 
369 /*
370  * Write an entry, polling until success.
371  * Long binary entries are broken into 255 byte chunks.
372  */
373 
374 void
375 fifolog_write_bytes_poll(struct fifolog_writer *f, uint32_t id, time_t now, const void *ptr, unsigned len)
376 {
377 	u_int l;
378 	const unsigned char *p;
379 
380 	fifolog_write_assert(f);
381 
382 	assert(!(id & (FIFOLOG_TIMESTAMP|FIFOLOG_LENGTH)));
383 	assert(ptr != NULL);
384 
385 	if (len == 0) {
386 		while (!fifolog_write_bytes(f, id, now, ptr, len)) {
387 			(void)fifolog_write_poll(f, now);
388 			(void)usleep(10000);
389 		}
390 	} else {
391 		p = ptr;
392 		for (p = ptr; len > 0; len -= l, p += l) {
393 			l = len;
394 			if (l > 255)
395 				l = 255;
396 			while (!fifolog_write_bytes(f, id, now, p, l)) {
397 				(void)fifolog_write_poll(f, now);
398 				(void)usleep(10000);
399 			}
400 		}
401 	}
402 	fifolog_write_assert(f);
403 }
404 
405 int
406 fifolog_write_flush(struct fifolog_writer *f)
407 {
408 	int i;
409 
410 	fifolog_write_assert(f);
411 
412 	f->cleanup = 1;
413 	for (i = 0; fifolog_write_poll(f, 0); i = 1)
414 		continue;
415 	fifolog_write_assert(f);
416 	return (i);
417 }
418