1 #include "pgreplay.h"
2 
3 #include <stdio.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <sys/time.h>
9 #ifdef WINDOWS
10 #	include <io.h>
11 #	include <winsock.h>
12 #	define FILE_MODE S_IRUSR | S_IWUSR
13 #else
14 #	ifdef HAVE_NETINET_IN_H
15 #		include <netinet/in.h>
16 #	endif
17 #	define FILE_MODE S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
18 #endif
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <fcntl.h>
22 
23 /* input or output file */
24 static int filed=0;
25 
26 /* functions to convert 64-bit integers between host and network byte order */
27 #ifndef htonll
28 #	ifdef WORDS_BIGENDIAN
29 #		define htonll(x) (x)
30 #		define ntohll(x) (x)
31 #	else
32 #		define htonll(x) ((((uint64_t)htonl(x)) << 32) + htonl(x >> 32))
33 #		define ntohll(x) ((((uint64_t)ntohl(x)) << 32) + ntohl(x >> 32))
34 #	endif
35 #endif
36 
37 /* this length indicates a null value */
38 #define NULL_VALUE 0x80000000
39 
40 /* wrapper functions for read and write */
do_write(const void * const buf,size_t count)41 static int do_write(const void * const buf, size_t count) {
42 	int rc = write(filed, buf, count);
43 
44 	if (-1 == rc) {
45 		perror("Error writing to output file");
46 		return 0;
47 	} else if (count != rc) {
48 		fprintf(stderr, "Error: not all bytes written to output file\n");
49 		return 0;
50 	}
51 
52 	return 1;
53 }
54 
do_read(void * buf,size_t count,int * eof_indicator)55 static int do_read(void *buf, size_t count, int *eof_indicator) {
56 	int rc = read(filed, buf, count);
57 
58 	if (eof_indicator) {
59 		*eof_indicator = 0;
60 	}
61 
62 	if (-1 == rc) {
63 		perror("Error reading from input file");
64 		return 0;
65 	} else if (eof_indicator && (0 == rc)) {
66 		*eof_indicator = 1;
67 	} else if (count != rc) {
68 		fprintf(stderr, "Error: unexpected end of file on input file\n");
69 		return 0;
70 	}
71 
72 	return 1;
73 }
74 
75 /* write a string to the output file */
write_string(char const * const s)76 static int write_string(char const * const s) {
77 	uint32_t u32, len;
78 
79 	/* write length + NULL indicator (4 byte) */
80 	if (NULL == s) {
81 		len = NULL_VALUE;
82 	} else {
83 		len = strlen(s);
84 	}
85 	u32 = htonl(len);
86 	if (! do_write(&u32, 4)) {
87 		return 0;
88 	} else if (NULL != s) {
89 		/* write string */
90 		if (! do_write(s, len)) {
91 			return 0;
92 		}
93 	}
94 
95 	return 1;
96 }
97 
98 /* malloc and read a string from the input file */
read_string(char ** const s)99 static int read_string(char ** const s) {
100 	uint32_t u32, len;
101 
102 	/* read length (4 byte) */
103 	if (! do_read(&u32, 4, NULL)) {
104 		return 0;
105 	}
106 	len = ntohl(u32);
107 	if (NULL_VALUE == len) {
108 		*s = NULL;
109 	} else {
110 		/* allocate the string */
111 		if (! (*s = malloc(len + 1))) {
112 			fprintf(stderr, "Cannot allocate %d bytes of memory\n", len + 1);
113 			return 0;
114 		} else {
115 			/* read string */
116 			if (! do_read(*s, len, NULL)) {
117 				return 0;
118 			}
119 			(*s)[len] = '\0';
120 		}
121 	}
122 
123 	return 1;
124 }
125 
file_provider_init(const char * infile,int cvs,const char * begin_time,const char * end_time,const char * db_only,const char * usr_only)126 int file_provider_init(const char *infile, int cvs, const char *begin_time, const char *end_time, const char *db_only, const char *usr_only) {
127 	int rc = 1;
128 	debug(3, "Entering file_provider_init%s\n", "");
129 
130 	if (NULL == infile) {
131 		filed = 0;
132 #ifdef WINDOWS
133 		setmode(filed, O_BINARY);
134 #endif
135 	} else {
136 		if (-1 == (filed = open(infile, O_RDONLY
137 #ifdef WINDOWS
138 					| O_BINARY
139 #endif
140 					))) {
141 			perror("Error opening input file:");
142 			rc = 0;
143 		}
144 	}
145 
146 	debug(3, "Leaving file_provider_init%s\n", "");
147 
148 	return rc;
149 }
150 
file_provider_finish()151 void file_provider_finish() {
152 	debug(3, "Entering file_provider_finish%s\n", "");
153 
154 	if (0 != filed) {
155 		if (close(filed)) {
156 			perror("Error closing input file:");
157 		}
158 	}
159 
160 	debug(3, "Leaving file_provider_finish%s\n", "");
161 }
162 
file_provider()163 replay_item * file_provider() {
164 	replay_item *r = NULL;
165 	uint16_t u16;
166 	uint32_t u32;
167 	uint64_t u64, session_id = 0;
168 	struct timeval tv;
169 	replay_type type = -1;
170 	int ok = 1, i = 0, eof;
171 	unsigned long count;
172 	char *user, *database, *statement, *name, **values, nl;
173 
174 	debug(3, "Entering file_provider%s\n", "");
175 
176 	/* read timestamp (8 byte) */
177 	if (! do_read(&u32, 4, &eof)) {
178 		ok = 0;
179 	} else {
180 		/* handle expected end-of-file condition */
181 		if (eof) {
182 			return end_item;
183 		}
184 
185 		tv.tv_sec = ntohl(u32);
186 		if (! do_read(&u32, 4, NULL)) {
187 			ok = 0;
188 		} else {
189 			tv.tv_usec = ntohl(u32);
190 		}
191 	}
192 
193 	/* read session_id (8 byte) */
194 	if (ok && do_read(&u64, 8, NULL)) {
195 		session_id = ntohll(u64);
196 	} else {
197 		ok = 0;
198 	}
199 
200 	/* read type (1 byte) */
201 	if (ok) {
202 		u16 = 0;
203 		if (! do_read((char *)(&u16) + 1, 1, NULL)) {
204 			ok = 0;
205 		} else {
206 			type = ntohs(u16);
207 			if ((type < pg_connect) || (type > pg_cancel)) {
208 				fprintf(stderr, "Error: unknown type %u encountered\n", type);
209 				ok = 0;
210 			}
211 		}
212 	}
213 
214 	/* read type specific stuff */
215 	if (ok) {
216 		switch (type) {
217 			case pg_connect:
218 				if (read_string(&user)) {
219 					if (read_string(&database)) {
220 						r = replay_create_connect(&tv, session_id, user, database);
221 						free(database);
222 					}
223 					free(user);
224 				}
225 				break;
226 			case pg_disconnect:
227 				r = replay_create_disconnect(&tv, session_id);
228 				break;
229 			case pg_execute:
230 				if (read_string(&statement)) {
231 					r = replay_create_execute(&tv, session_id, statement);
232 					free(statement);
233 				}
234 				break;
235 			case pg_prepare:
236 				if (read_string(&statement)) {
237 					if (read_string(&name)) {
238 						r = replay_create_prepare(&tv, session_id, name, statement);
239 						free(name);
240 					}
241 					free(statement);
242 				}
243 				break;
244 			case pg_exec_prepared:
245 				/* read statement name */
246 				if (read_string(&name)) {
247 					/* number of bind arguments (2 byte) */
248 					if (do_read(&u16, 2, NULL)) {
249 						count = ntohs(u16);
250 						if (NULL == (values = calloc(count, sizeof(char *)))) {
251 							fprintf(stderr, "Cannot allocate %lu bytes of memory\n", count * sizeof(char *));
252 						} else {
253 							/* read bind values */
254 							while (i < count) {
255 								if (read_string(values + i)) {
256 									++i;
257 								} else {
258 									break;
259 								}
260 							}
261 							if (i == count) {
262 								r = replay_create_exec_prepared(&tv, session_id, name, count, values);
263 							}
264 							while (--i >= 0) {
265 								if (values[i]) {
266 									free(values[i]);
267 								}
268 							}
269 							free(values);
270 						}
271 					}
272 					free(name);
273 				}
274 				break;
275 			case pg_cancel:
276 				r = replay_create_cancel(&tv, session_id);
277 				break;
278 		}
279 	}
280 
281 	/* read new-line at the end of the record */
282 	if (r && do_read(&nl, 1, NULL) && ('\n' != nl)) {
283 		fprintf(stderr, "Error: missing new-line at end of line\n");
284 		if (r) {
285 			replay_free(r);
286 			r = NULL;
287 		}
288 	}
289 
290 	if (r && (1 <= debug_level) && (end_item != r)) {
291 		replay_print_debug(r);
292 	}
293 
294 	debug(3, "Leaving file_provider%s\n", "");
295 
296     return r;
297 }
298 
file_consumer_init(const char * outfile,const char * host,int port,const char * passwd,double factor)299 int file_consumer_init(const char *outfile, const char *host, int port, const char *passwd, double factor) {
300 	debug(3, "Entering file_consumer_init%s\n", "");
301 
302 	if ((NULL == outfile) || ('\0' == outfile[0])
303 			|| (('-' == outfile[0]) && ('\0' == outfile[1]))) {
304 		filed = 1;
305 #ifdef WINDOWS
306 		/* set stdout to binary mode */
307 		setmode(filed, O_BINARY);
308 #endif
309 	} else {
310 		if (-1 == (filed = open(outfile, O_WRONLY | O_CREAT | O_TRUNC
311 #ifdef WINDOWS
312 					| O_BINARY
313 #endif
314 					, FILE_MODE))) {
315 			perror("Error opening output file:");
316 			return 0;
317 		}
318 	}
319 
320 	debug(3, "Leaving file_consumer_init%s\n", "");
321 	return 1;
322 }
323 
file_consumer_finish()324 void file_consumer_finish() {
325 	debug(3, "Entering file_consumer_finish%s\n", "");
326 
327 	if (1 != filed) {
328 		if (close(filed)) {
329 			perror("Error closing output file:");
330 		}
331 	}
332 
333 	debug(3, "Leaving file_consumer_finish%s\n", "");
334 }
335 
file_consumer(replay_item * item)336 int file_consumer(replay_item *item) {
337 	const struct timeval *tv = replay_get_time(item);
338 	uint16_t count;
339 	const replay_type type = replay_get_type(item);
340 	uint16_t u16, i;
341 	uint32_t u32;
342 	uint64_t u64;
343 	int rc = 1;
344 	const char * const *values;
345 
346 	debug(3, "Entering file_consumer%s\n", "");
347 
348 	/* write timestamp (8 byte) */
349 	u32 = htonl(tv->tv_sec);
350 	if (! do_write(&u32, 4)) {
351 		rc = -1;
352 	} else {
353 		u32 = htonl(tv->tv_usec);
354 		if (! do_write(&u32, 4)) {
355 			rc = -1;
356 		}
357 	}
358 
359 	/* write session_id (8 byte) */
360 	if (1 == rc) {
361 		u64 = htonll(replay_get_session_id(item));
362 		if (! do_write(&u64, 8)) {
363 			rc = -1;
364 		}
365 	}
366 
367 	/* write type (1 byte) */
368 	if (1 == rc) {
369 		u16 = htons((uint16_t) type);
370 		if (! do_write((char *)(&u16) + 1, 1)) {
371 			rc = -1;
372 		}
373 	}
374 
375 	/* write type specific stuff */
376 	if (1 == rc) {
377 		switch (type) {
378 			case pg_connect:
379 				if (! write_string(replay_get_user(item))) {
380 					rc = -1;
381 				} else if (! write_string(replay_get_database(item))) {
382 					rc = -1;
383 				}
384 				break;
385 			case pg_disconnect:
386 				break;
387 			case pg_execute:
388 				if (! write_string(replay_get_statement(item))) {
389 					rc = -1;
390 				}
391 				break;
392 			case pg_prepare:
393 				if (! write_string(replay_get_statement(item))) {
394 					rc = -1;
395 				} else if (! write_string(replay_get_name(item))) {
396 					rc = -1;
397 				}
398 				break;
399 			case pg_exec_prepared:
400 				count = replay_get_valuecount(item);
401 				/* write statement name */
402 				if (! write_string(replay_get_name(item))) {
403 					rc = -1;
404 				} else {
405 					/* write count (2 byte) */
406 					u16 = htons(count);
407 					if (! do_write(&u16, 2)) {
408 						rc = -1;
409 					} else {
410 						/* write values */
411 						values = replay_get_values(item);
412 						for (i=0; i<count; ++i) {
413 							if (! write_string(values[i])) {
414 								rc = -1;
415 								break;
416 							}
417 						}
418 					}
419 				}
420 				break;
421 			case pg_cancel:
422 				break;
423 		}
424 	}
425 
426 	/* write new-line (1 byte) */
427 	if (1 == rc) {
428 		if (! do_write("\n", 1)) {
429 			rc = -1;
430 		}
431 	}
432 
433 	replay_free(item);
434 
435 	debug(3, "Leaving file_consumer%s\n", "");
436 	return rc;
437 }
438