1 #include "pgreplay.h"
2
3 #include <stdio.h>
4 #include <unistd.h>
5 #include <stdint.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <sys/time.h>
9 #ifdef WINDOWS
10 # include <io.h>
11 # include <winsock.h>
12 # define FILE_MODE S_IRUSR | S_IWUSR
13 #else
14 # ifdef HAVE_NETINET_IN_H
15 # include <netinet/in.h>
16 # endif
17 # define FILE_MODE S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
18 #endif
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <fcntl.h>
22
23 /* input or output file */
24 static int filed=0;
25
26 /* functions to convert 64-bit integers between host and network byte order */
27 #ifndef htonll
28 # ifdef WORDS_BIGENDIAN
29 # define htonll(x) (x)
30 # define ntohll(x) (x)
31 # else
32 # define htonll(x) ((((uint64_t)htonl(x)) << 32) + htonl(x >> 32))
33 # define ntohll(x) ((((uint64_t)ntohl(x)) << 32) + ntohl(x >> 32))
34 # endif
35 #endif
36
37 /* this length indicates a null value */
38 #define NULL_VALUE 0x80000000
39
40 /* wrapper functions for read and write */
do_write(const void * const buf,size_t count)41 static int do_write(const void * const buf, size_t count) {
42 int rc = write(filed, buf, count);
43
44 if (-1 == rc) {
45 perror("Error writing to output file");
46 return 0;
47 } else if (count != rc) {
48 fprintf(stderr, "Error: not all bytes written to output file\n");
49 return 0;
50 }
51
52 return 1;
53 }
54
do_read(void * buf,size_t count,int * eof_indicator)55 static int do_read(void *buf, size_t count, int *eof_indicator) {
56 int rc = read(filed, buf, count);
57
58 if (eof_indicator) {
59 *eof_indicator = 0;
60 }
61
62 if (-1 == rc) {
63 perror("Error reading from input file");
64 return 0;
65 } else if (eof_indicator && (0 == rc)) {
66 *eof_indicator = 1;
67 } else if (count != rc) {
68 fprintf(stderr, "Error: unexpected end of file on input file\n");
69 return 0;
70 }
71
72 return 1;
73 }
74
75 /* write a string to the output file */
write_string(char const * const s)76 static int write_string(char const * const s) {
77 uint32_t u32, len;
78
79 /* write length + NULL indicator (4 byte) */
80 if (NULL == s) {
81 len = NULL_VALUE;
82 } else {
83 len = strlen(s);
84 }
85 u32 = htonl(len);
86 if (! do_write(&u32, 4)) {
87 return 0;
88 } else if (NULL != s) {
89 /* write string */
90 if (! do_write(s, len)) {
91 return 0;
92 }
93 }
94
95 return 1;
96 }
97
98 /* malloc and read a string from the input file */
read_string(char ** const s)99 static int read_string(char ** const s) {
100 uint32_t u32, len;
101
102 /* read length (4 byte) */
103 if (! do_read(&u32, 4, NULL)) {
104 return 0;
105 }
106 len = ntohl(u32);
107 if (NULL_VALUE == len) {
108 *s = NULL;
109 } else {
110 /* allocate the string */
111 if (! (*s = malloc(len + 1))) {
112 fprintf(stderr, "Cannot allocate %d bytes of memory\n", len + 1);
113 return 0;
114 } else {
115 /* read string */
116 if (! do_read(*s, len, NULL)) {
117 return 0;
118 }
119 (*s)[len] = '\0';
120 }
121 }
122
123 return 1;
124 }
125
file_provider_init(const char * infile,int cvs,const char * begin_time,const char * end_time,const char * db_only,const char * usr_only)126 int file_provider_init(const char *infile, int cvs, const char *begin_time, const char *end_time, const char *db_only, const char *usr_only) {
127 int rc = 1;
128 debug(3, "Entering file_provider_init%s\n", "");
129
130 if (NULL == infile) {
131 filed = 0;
132 #ifdef WINDOWS
133 setmode(filed, O_BINARY);
134 #endif
135 } else {
136 if (-1 == (filed = open(infile, O_RDONLY
137 #ifdef WINDOWS
138 | O_BINARY
139 #endif
140 ))) {
141 perror("Error opening input file:");
142 rc = 0;
143 }
144 }
145
146 debug(3, "Leaving file_provider_init%s\n", "");
147
148 return rc;
149 }
150
file_provider_finish()151 void file_provider_finish() {
152 debug(3, "Entering file_provider_finish%s\n", "");
153
154 if (0 != filed) {
155 if (close(filed)) {
156 perror("Error closing input file:");
157 }
158 }
159
160 debug(3, "Leaving file_provider_finish%s\n", "");
161 }
162
file_provider()163 replay_item * file_provider() {
164 replay_item *r = NULL;
165 uint16_t u16;
166 uint32_t u32;
167 uint64_t u64, session_id = 0;
168 struct timeval tv;
169 replay_type type = -1;
170 int ok = 1, i = 0, eof;
171 unsigned long count;
172 char *user, *database, *statement, *name, **values, nl;
173
174 debug(3, "Entering file_provider%s\n", "");
175
176 /* read timestamp (8 byte) */
177 if (! do_read(&u32, 4, &eof)) {
178 ok = 0;
179 } else {
180 /* handle expected end-of-file condition */
181 if (eof) {
182 return end_item;
183 }
184
185 tv.tv_sec = ntohl(u32);
186 if (! do_read(&u32, 4, NULL)) {
187 ok = 0;
188 } else {
189 tv.tv_usec = ntohl(u32);
190 }
191 }
192
193 /* read session_id (8 byte) */
194 if (ok && do_read(&u64, 8, NULL)) {
195 session_id = ntohll(u64);
196 } else {
197 ok = 0;
198 }
199
200 /* read type (1 byte) */
201 if (ok) {
202 u16 = 0;
203 if (! do_read((char *)(&u16) + 1, 1, NULL)) {
204 ok = 0;
205 } else {
206 type = ntohs(u16);
207 if ((type < pg_connect) || (type > pg_cancel)) {
208 fprintf(stderr, "Error: unknown type %u encountered\n", type);
209 ok = 0;
210 }
211 }
212 }
213
214 /* read type specific stuff */
215 if (ok) {
216 switch (type) {
217 case pg_connect:
218 if (read_string(&user)) {
219 if (read_string(&database)) {
220 r = replay_create_connect(&tv, session_id, user, database);
221 free(database);
222 }
223 free(user);
224 }
225 break;
226 case pg_disconnect:
227 r = replay_create_disconnect(&tv, session_id);
228 break;
229 case pg_execute:
230 if (read_string(&statement)) {
231 r = replay_create_execute(&tv, session_id, statement);
232 free(statement);
233 }
234 break;
235 case pg_prepare:
236 if (read_string(&statement)) {
237 if (read_string(&name)) {
238 r = replay_create_prepare(&tv, session_id, name, statement);
239 free(name);
240 }
241 free(statement);
242 }
243 break;
244 case pg_exec_prepared:
245 /* read statement name */
246 if (read_string(&name)) {
247 /* number of bind arguments (2 byte) */
248 if (do_read(&u16, 2, NULL)) {
249 count = ntohs(u16);
250 if (NULL == (values = calloc(count, sizeof(char *)))) {
251 fprintf(stderr, "Cannot allocate %lu bytes of memory\n", count * sizeof(char *));
252 } else {
253 /* read bind values */
254 while (i < count) {
255 if (read_string(values + i)) {
256 ++i;
257 } else {
258 break;
259 }
260 }
261 if (i == count) {
262 r = replay_create_exec_prepared(&tv, session_id, name, count, values);
263 }
264 while (--i >= 0) {
265 if (values[i]) {
266 free(values[i]);
267 }
268 }
269 free(values);
270 }
271 }
272 free(name);
273 }
274 break;
275 case pg_cancel:
276 r = replay_create_cancel(&tv, session_id);
277 break;
278 }
279 }
280
281 /* read new-line at the end of the record */
282 if (r && do_read(&nl, 1, NULL) && ('\n' != nl)) {
283 fprintf(stderr, "Error: missing new-line at end of line\n");
284 if (r) {
285 replay_free(r);
286 r = NULL;
287 }
288 }
289
290 if (r && (1 <= debug_level) && (end_item != r)) {
291 replay_print_debug(r);
292 }
293
294 debug(3, "Leaving file_provider%s\n", "");
295
296 return r;
297 }
298
file_consumer_init(const char * outfile,const char * host,int port,const char * passwd,double factor)299 int file_consumer_init(const char *outfile, const char *host, int port, const char *passwd, double factor) {
300 debug(3, "Entering file_consumer_init%s\n", "");
301
302 if ((NULL == outfile) || ('\0' == outfile[0])
303 || (('-' == outfile[0]) && ('\0' == outfile[1]))) {
304 filed = 1;
305 #ifdef WINDOWS
306 /* set stdout to binary mode */
307 setmode(filed, O_BINARY);
308 #endif
309 } else {
310 if (-1 == (filed = open(outfile, O_WRONLY | O_CREAT | O_TRUNC
311 #ifdef WINDOWS
312 | O_BINARY
313 #endif
314 , FILE_MODE))) {
315 perror("Error opening output file:");
316 return 0;
317 }
318 }
319
320 debug(3, "Leaving file_consumer_init%s\n", "");
321 return 1;
322 }
323
file_consumer_finish()324 void file_consumer_finish() {
325 debug(3, "Entering file_consumer_finish%s\n", "");
326
327 if (1 != filed) {
328 if (close(filed)) {
329 perror("Error closing output file:");
330 }
331 }
332
333 debug(3, "Leaving file_consumer_finish%s\n", "");
334 }
335
file_consumer(replay_item * item)336 int file_consumer(replay_item *item) {
337 const struct timeval *tv = replay_get_time(item);
338 uint16_t count;
339 const replay_type type = replay_get_type(item);
340 uint16_t u16, i;
341 uint32_t u32;
342 uint64_t u64;
343 int rc = 1;
344 const char * const *values;
345
346 debug(3, "Entering file_consumer%s\n", "");
347
348 /* write timestamp (8 byte) */
349 u32 = htonl(tv->tv_sec);
350 if (! do_write(&u32, 4)) {
351 rc = -1;
352 } else {
353 u32 = htonl(tv->tv_usec);
354 if (! do_write(&u32, 4)) {
355 rc = -1;
356 }
357 }
358
359 /* write session_id (8 byte) */
360 if (1 == rc) {
361 u64 = htonll(replay_get_session_id(item));
362 if (! do_write(&u64, 8)) {
363 rc = -1;
364 }
365 }
366
367 /* write type (1 byte) */
368 if (1 == rc) {
369 u16 = htons((uint16_t) type);
370 if (! do_write((char *)(&u16) + 1, 1)) {
371 rc = -1;
372 }
373 }
374
375 /* write type specific stuff */
376 if (1 == rc) {
377 switch (type) {
378 case pg_connect:
379 if (! write_string(replay_get_user(item))) {
380 rc = -1;
381 } else if (! write_string(replay_get_database(item))) {
382 rc = -1;
383 }
384 break;
385 case pg_disconnect:
386 break;
387 case pg_execute:
388 if (! write_string(replay_get_statement(item))) {
389 rc = -1;
390 }
391 break;
392 case pg_prepare:
393 if (! write_string(replay_get_statement(item))) {
394 rc = -1;
395 } else if (! write_string(replay_get_name(item))) {
396 rc = -1;
397 }
398 break;
399 case pg_exec_prepared:
400 count = replay_get_valuecount(item);
401 /* write statement name */
402 if (! write_string(replay_get_name(item))) {
403 rc = -1;
404 } else {
405 /* write count (2 byte) */
406 u16 = htons(count);
407 if (! do_write(&u16, 2)) {
408 rc = -1;
409 } else {
410 /* write values */
411 values = replay_get_values(item);
412 for (i=0; i<count; ++i) {
413 if (! write_string(values[i])) {
414 rc = -1;
415 break;
416 }
417 }
418 }
419 }
420 break;
421 case pg_cancel:
422 break;
423 }
424 }
425
426 /* write new-line (1 byte) */
427 if (1 == rc) {
428 if (! do_write("\n", 1)) {
429 rc = -1;
430 }
431 }
432
433 replay_free(item);
434
435 debug(3, "Leaving file_consumer%s\n", "");
436 return rc;
437 }
438