1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/mman.h>
25 #include <sys/resource.h>
26 #include <sys/stat.h>
27 #include <sys/time.h>
28 #include <sys/types.h>
29 #include <time.h>
30 #include <unistd.h>
31
32 #ifdef __MACH__ // OS X does not have clock_gettime
33 #include <mach/clock.h>
34 #include <mach/mach.h>
35 #include <mach/mach_time.h>
36 #endif
37
38 #include "config.h"
39 #include "hdfs.h"
40
41 #define VECSUM_CHUNK_SIZE (8 * 1024 * 1024)
42 #define ZCR_READ_CHUNK_SIZE (1024 * 1024 * 8)
43 #define NORMAL_READ_CHUNK_SIZE (8 * 1024 * 1024)
44 #define DOUBLES_PER_LOOP_ITER 16
45
timespec_to_double(const struct timespec * ts)46 static double timespec_to_double(const struct timespec *ts)
47 {
48 double sec = ts->tv_sec;
49 double nsec = ts->tv_nsec;
50 return sec + (nsec / 1000000000L);
51 }
52
53 struct stopwatch {
54 struct timespec start;
55 struct timespec stop;
56 };
57
58
59 #ifdef __MACH__
clock_gettime_mono(struct timespec * ts)60 static int clock_gettime_mono(struct timespec * ts) {
61 static mach_timebase_info_data_t tb;
62 static uint64_t timestart = 0;
63 uint64_t t = 0;
64 if (timestart == 0) {
65 mach_timebase_info(&tb);
66 timestart = mach_absolute_time();
67 }
68 t = mach_absolute_time() - timestart;
69 t *= tb.numer;
70 t /= tb.denom;
71 ts->tv_sec = t / 1000000000ULL;
72 ts->tv_nsec = t - (ts->tv_sec * 1000000000ULL);
73 return 0;
74 }
75 #else
clock_gettime_mono(struct timespec * ts)76 static int clock_gettime_mono(struct timespec * ts) {
77 return clock_gettime(CLOCK_MONOTONIC, ts);
78 }
79 #endif
80
stopwatch_create(void)81 static struct stopwatch *stopwatch_create(void)
82 {
83 struct stopwatch *watch;
84
85 watch = calloc(1, sizeof(struct stopwatch));
86 if (!watch) {
87 fprintf(stderr, "failed to allocate memory for stopwatch\n");
88 goto error;
89 }
90 if (clock_gettime_mono(&watch->start)) {
91 int err = errno;
92 fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
93 "error %d (%s)\n", err, strerror(err));
94 goto error;
95 }
96 return watch;
97
98 error:
99 free(watch);
100 return NULL;
101 }
102
stopwatch_stop(struct stopwatch * watch,long long bytes_read)103 static void stopwatch_stop(struct stopwatch *watch,
104 long long bytes_read)
105 {
106 double elapsed, rate;
107
108 if (clock_gettime_mono(&watch->stop)) {
109 int err = errno;
110 fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
111 "error %d (%s)\n", err, strerror(err));
112 goto done;
113 }
114 elapsed = timespec_to_double(&watch->stop) -
115 timespec_to_double(&watch->start);
116 rate = (bytes_read / elapsed) / (1024 * 1024 * 1024);
117 printf("stopwatch: took %.5g seconds to read %lld bytes, "
118 "for %.5g GB/s\n", elapsed, bytes_read, rate);
119 printf("stopwatch: %.5g seconds\n", elapsed);
120 done:
121 free(watch);
122 }
123
124 enum vecsum_type {
125 VECSUM_LOCAL = 0,
126 VECSUM_LIBHDFS,
127 VECSUM_ZCR,
128 };
129
130 #define VECSUM_TYPE_VALID_VALUES "libhdfs, zcr, or local"
131
parse_vecsum_type(const char * str)132 int parse_vecsum_type(const char *str)
133 {
134 if (strcasecmp(str, "local") == 0)
135 return VECSUM_LOCAL;
136 else if (strcasecmp(str, "libhdfs") == 0)
137 return VECSUM_LIBHDFS;
138 else if (strcasecmp(str, "zcr") == 0)
139 return VECSUM_ZCR;
140 else
141 return -1;
142 }
143
144 struct options {
145 // The path to read.
146 const char *path;
147
148 // Length of the file.
149 long long length;
150
151 // The number of times to read the path.
152 int passes;
153
154 // Type of vecsum to do
155 enum vecsum_type ty;
156
157 // RPC address to use for HDFS
158 const char *rpc_address;
159 };
160
options_create(void)161 static struct options *options_create(void)
162 {
163 struct options *opts = NULL;
164 const char *pass_str;
165 const char *ty_str;
166 const char *length_str;
167 int ty;
168
169 opts = calloc(1, sizeof(struct options));
170 if (!opts) {
171 fprintf(stderr, "failed to calloc options\n");
172 goto error;
173 }
174 opts->path = getenv("VECSUM_PATH");
175 if (!opts->path) {
176 fprintf(stderr, "You must set the VECSUM_PATH environment "
177 "variable to the path of the file to read.\n");
178 goto error;
179 }
180 length_str = getenv("VECSUM_LENGTH");
181 if (!length_str) {
182 length_str = "2147483648";
183 }
184 opts->length = atoll(length_str);
185 if (!opts->length) {
186 fprintf(stderr, "Can't parse VECSUM_LENGTH of '%s'.\n",
187 length_str);
188 goto error;
189 }
190 if (opts->length % VECSUM_CHUNK_SIZE) {
191 fprintf(stderr, "VECSUM_LENGTH must be a multiple of '%lld'. The "
192 "currently specified length of '%lld' is not.\n",
193 (long long)VECSUM_CHUNK_SIZE, (long long)opts->length);
194 goto error;
195 }
196 pass_str = getenv("VECSUM_PASSES");
197 if (!pass_str) {
198 fprintf(stderr, "You must set the VECSUM_PASSES environment "
199 "variable to the number of passes to make.\n");
200 goto error;
201 }
202 opts->passes = atoi(pass_str);
203 if (opts->passes <= 0) {
204 fprintf(stderr, "Invalid value for the VECSUM_PASSES "
205 "environment variable. You must set this to a "
206 "number greater than 0.\n");
207 goto error;
208 }
209 ty_str = getenv("VECSUM_TYPE");
210 if (!ty_str) {
211 fprintf(stderr, "You must set the VECSUM_TYPE environment "
212 "variable to " VECSUM_TYPE_VALID_VALUES "\n");
213 goto error;
214 }
215 ty = parse_vecsum_type(ty_str);
216 if (ty < 0) {
217 fprintf(stderr, "Invalid VECSUM_TYPE environment variable. "
218 "Valid values are " VECSUM_TYPE_VALID_VALUES "\n");
219 goto error;
220 }
221 opts->ty = ty;
222 opts->rpc_address = getenv("VECSUM_RPC_ADDRESS");
223 if (!opts->rpc_address) {
224 opts->rpc_address = "default";
225 }
226 return opts;
227 error:
228 free(opts);
229 return NULL;
230 }
231
test_file_chunk_setup(double ** chunk)232 static int test_file_chunk_setup(double **chunk)
233 {
234 int i;
235 double *c, val;
236
237 c = malloc(VECSUM_CHUNK_SIZE);
238 if (!c) {
239 fprintf(stderr, "test_file_create: failed to malloc "
240 "a buffer of size '%lld'\n",
241 (long long) VECSUM_CHUNK_SIZE);
242 return EIO;
243 }
244 val = 0.0;
245 for (i = 0; i < VECSUM_CHUNK_SIZE / sizeof(double); i++) {
246 c[i] = val;
247 val += 0.5;
248 }
249 *chunk = c;
250 return 0;
251 }
252
options_free(struct options * opts)253 static void options_free(struct options *opts)
254 {
255 free(opts);
256 }
257
258 struct local_data {
259 int fd;
260 double *mmap;
261 long long length;
262 };
263
local_data_create_file(struct local_data * cdata,const struct options * opts)264 static int local_data_create_file(struct local_data *cdata,
265 const struct options *opts)
266 {
267 int ret = EIO;
268 int dup_fd = -1;
269 FILE *fp = NULL;
270 double *chunk = NULL;
271 long long offset = 0;
272
273 dup_fd = dup(cdata->fd);
274 if (dup_fd < 0) {
275 ret = errno;
276 fprintf(stderr, "local_data_create_file: dup failed: %s (%d)\n",
277 strerror(ret), ret);
278 goto done;
279 }
280 fp = fdopen(dup_fd, "w");
281 if (!fp) {
282 ret = errno;
283 fprintf(stderr, "local_data_create_file: fdopen failed: %s (%d)\n",
284 strerror(ret), ret);
285 goto done;
286 }
287 ret = test_file_chunk_setup(&chunk);
288 if (ret)
289 goto done;
290 while (offset < opts->length) {
291 if (fwrite(chunk, VECSUM_CHUNK_SIZE, 1, fp) != 1) {
292 fprintf(stderr, "local_data_create_file: failed to write to "
293 "the local file '%s' at offset %lld\n",
294 opts->path, offset);
295 ret = EIO;
296 goto done;
297 }
298 offset += VECSUM_CHUNK_SIZE;
299 }
300 fprintf(stderr, "local_data_create_file: successfully re-wrote %s as "
301 "a file of length %lld\n", opts->path, opts->length);
302 ret = 0;
303
304 done:
305 if (dup_fd >= 0) {
306 close(dup_fd);
307 }
308 if (fp) {
309 fclose(fp);
310 }
311 free(chunk);
312 return ret;
313 }
314
local_data_create(const struct options * opts)315 static struct local_data *local_data_create(const struct options *opts)
316 {
317 struct local_data *cdata = NULL;
318 struct stat st_buf;
319
320 cdata = malloc(sizeof(*cdata));
321 if (!cdata) {
322 fprintf(stderr, "Failed to allocate local test data.\n");
323 goto error;
324 }
325 cdata->fd = -1;
326 cdata->mmap = MAP_FAILED;
327 cdata->length = opts->length;
328
329 cdata->fd = open(opts->path, O_RDWR | O_CREAT, 0777);
330 if (cdata->fd < 0) {
331 int err = errno;
332 fprintf(stderr, "local_data_create: failed to open %s "
333 "for read/write: error %d (%s)\n", opts->path, err, strerror(err));
334 goto error;
335 }
336 if (fstat(cdata->fd, &st_buf)) {
337 int err = errno;
338 fprintf(stderr, "local_data_create: fstat(%s) failed: "
339 "error %d (%s)\n", opts->path, err, strerror(err));
340 goto error;
341 }
342 if (st_buf.st_size != opts->length) {
343 int err;
344 fprintf(stderr, "local_data_create: current size of %s is %lld, but "
345 "we want %lld. Re-writing the file.\n",
346 opts->path, (long long)st_buf.st_size,
347 (long long)opts->length);
348 err = local_data_create_file(cdata, opts);
349 if (err)
350 goto error;
351 }
352 cdata->mmap = mmap(NULL, cdata->length, PROT_READ,
353 MAP_PRIVATE, cdata->fd, 0);
354 if (cdata->mmap == MAP_FAILED) {
355 int err = errno;
356 fprintf(stderr, "local_data_create: mmap(%s) failed: "
357 "error %d (%s)\n", opts->path, err, strerror(err));
358 goto error;
359 }
360 return cdata;
361
362 error:
363 if (cdata) {
364 if (cdata->fd >= 0) {
365 close(cdata->fd);
366 }
367 free(cdata);
368 }
369 return NULL;
370 }
371
local_data_free(struct local_data * cdata)372 static void local_data_free(struct local_data *cdata)
373 {
374 close(cdata->fd);
375 munmap(cdata->mmap, cdata->length);
376 }
377
378 struct libhdfs_data {
379 hdfsFS fs;
380 hdfsFile file;
381 long long length;
382 double *buf;
383 };
384
libhdfs_data_free(struct libhdfs_data * ldata)385 static void libhdfs_data_free(struct libhdfs_data *ldata)
386 {
387 if (ldata->fs) {
388 free(ldata->buf);
389 if (ldata->file) {
390 hdfsCloseFile(ldata->fs, ldata->file);
391 }
392 hdfsDisconnect(ldata->fs);
393 }
394 free(ldata);
395 }
396
libhdfs_data_create_file(struct libhdfs_data * ldata,const struct options * opts)397 static int libhdfs_data_create_file(struct libhdfs_data *ldata,
398 const struct options *opts)
399 {
400 int ret;
401 double *chunk = NULL;
402 long long offset = 0;
403
404 ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_WRONLY, 0, 1, 0);
405 if (!ldata->file) {
406 ret = errno;
407 fprintf(stderr, "libhdfs_data_create_file: hdfsOpenFile(%s, "
408 "O_WRONLY) failed: error %d (%s)\n", opts->path, ret,
409 strerror(ret));
410 goto done;
411 }
412 ret = test_file_chunk_setup(&chunk);
413 if (ret)
414 goto done;
415 while (offset < opts->length) {
416 ret = hdfsWrite(ldata->fs, ldata->file, chunk, VECSUM_CHUNK_SIZE);
417 if (ret < 0) {
418 ret = errno;
419 fprintf(stderr, "libhdfs_data_create_file: got error %d (%s) at "
420 "offset %lld of %s\n", ret, strerror(ret),
421 offset, opts->path);
422 goto done;
423 } else if (ret < VECSUM_CHUNK_SIZE) {
424 fprintf(stderr, "libhdfs_data_create_file: got short write "
425 "of %d at offset %lld of %s\n", ret, offset, opts->path);
426 goto done;
427 }
428 offset += VECSUM_CHUNK_SIZE;
429 }
430 ret = 0;
431 done:
432 free(chunk);
433 if (ldata->file) {
434 if (hdfsCloseFile(ldata->fs, ldata->file)) {
435 fprintf(stderr, "libhdfs_data_create_file: hdfsCloseFile error.");
436 ret = EIO;
437 }
438 ldata->file = NULL;
439 }
440 return ret;
441 }
442
libhdfs_data_create(const struct options * opts)443 static struct libhdfs_data *libhdfs_data_create(const struct options *opts)
444 {
445 struct libhdfs_data *ldata = NULL;
446 struct hdfsBuilder *builder = NULL;
447 hdfsFileInfo *pinfo = NULL;
448
449 ldata = calloc(1, sizeof(struct libhdfs_data));
450 if (!ldata) {
451 fprintf(stderr, "Failed to allocate libhdfs test data.\n");
452 goto error;
453 }
454 builder = hdfsNewBuilder();
455 if (!builder) {
456 fprintf(stderr, "Failed to create builder.\n");
457 goto error;
458 }
459 hdfsBuilderSetNameNode(builder, opts->rpc_address);
460 hdfsBuilderConfSetStr(builder,
461 "dfs.client.read.shortcircuit.skip.checksum", "true");
462 ldata->fs = hdfsBuilderConnect(builder);
463 if (!ldata->fs) {
464 fprintf(stderr, "Could not connect to default namenode!\n");
465 goto error;
466 }
467 pinfo = hdfsGetPathInfo(ldata->fs, opts->path);
468 if (!pinfo) {
469 int err = errno;
470 fprintf(stderr, "hdfsGetPathInfo(%s) failed: error %d (%s). "
471 "Attempting to re-create file.\n",
472 opts->path, err, strerror(err));
473 if (libhdfs_data_create_file(ldata, opts))
474 goto error;
475 } else if (pinfo->mSize != opts->length) {
476 fprintf(stderr, "hdfsGetPathInfo(%s) failed: length was %lld, "
477 "but we want length %lld. Attempting to re-create file.\n",
478 opts->path, (long long)pinfo->mSize, (long long)opts->length);
479 if (libhdfs_data_create_file(ldata, opts))
480 goto error;
481 }
482 ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_RDONLY, 0, 0, 0);
483 if (!ldata->file) {
484 int err = errno;
485 fprintf(stderr, "hdfsOpenFile(%s) failed: error %d (%s)\n",
486 opts->path, err, strerror(err));
487 goto error;
488 }
489 ldata->length = opts->length;
490 return ldata;
491
492 error:
493 if (pinfo)
494 hdfsFreeFileInfo(pinfo, 1);
495 if (ldata)
496 libhdfs_data_free(ldata);
497 return NULL;
498 }
499
check_byte_size(int byte_size,const char * const str)500 static int check_byte_size(int byte_size, const char *const str)
501 {
502 if (byte_size % sizeof(double)) {
503 fprintf(stderr, "%s is not a multiple "
504 "of sizeof(double)\n", str);
505 return EINVAL;
506 }
507 if ((byte_size / sizeof(double)) % DOUBLES_PER_LOOP_ITER) {
508 fprintf(stderr, "The number of doubles contained in "
509 "%s is not a multiple of DOUBLES_PER_LOOP_ITER\n",
510 str);
511 return EINVAL;
512 }
513 return 0;
514 }
515
516 #ifdef HAVE_INTEL_SSE_INTRINSICS
517
518 #include <emmintrin.h>
519
vecsum(const double * buf,int num_doubles)520 static double vecsum(const double *buf, int num_doubles)
521 {
522 int i;
523 double hi, lo;
524 __m128d x0, x1, x2, x3, x4, x5, x6, x7;
525 __m128d sum0 = _mm_set_pd(0.0,0.0);
526 __m128d sum1 = _mm_set_pd(0.0,0.0);
527 __m128d sum2 = _mm_set_pd(0.0,0.0);
528 __m128d sum3 = _mm_set_pd(0.0,0.0);
529 __m128d sum4 = _mm_set_pd(0.0,0.0);
530 __m128d sum5 = _mm_set_pd(0.0,0.0);
531 __m128d sum6 = _mm_set_pd(0.0,0.0);
532 __m128d sum7 = _mm_set_pd(0.0,0.0);
533 for (i = 0; i < num_doubles; i+=DOUBLES_PER_LOOP_ITER) {
534 x0 = _mm_load_pd(buf + i + 0);
535 x1 = _mm_load_pd(buf + i + 2);
536 x2 = _mm_load_pd(buf + i + 4);
537 x3 = _mm_load_pd(buf + i + 6);
538 x4 = _mm_load_pd(buf + i + 8);
539 x5 = _mm_load_pd(buf + i + 10);
540 x6 = _mm_load_pd(buf + i + 12);
541 x7 = _mm_load_pd(buf + i + 14);
542 sum0 = _mm_add_pd(sum0, x0);
543 sum1 = _mm_add_pd(sum1, x1);
544 sum2 = _mm_add_pd(sum2, x2);
545 sum3 = _mm_add_pd(sum3, x3);
546 sum4 = _mm_add_pd(sum4, x4);
547 sum5 = _mm_add_pd(sum5, x5);
548 sum6 = _mm_add_pd(sum6, x6);
549 sum7 = _mm_add_pd(sum7, x7);
550 }
551 x0 = _mm_add_pd(sum0, sum1);
552 x1 = _mm_add_pd(sum2, sum3);
553 x2 = _mm_add_pd(sum4, sum5);
554 x3 = _mm_add_pd(sum6, sum7);
555 x4 = _mm_add_pd(x0, x1);
556 x5 = _mm_add_pd(x2, x3);
557 x6 = _mm_add_pd(x4, x5);
558 _mm_storeh_pd(&hi, x6);
559 _mm_storel_pd(&lo, x6);
560 return hi + lo;
561 }
562
563 #else
564
vecsum(const double * buf,int num_doubles)565 static double vecsum(const double *buf, int num_doubles)
566 {
567 int i;
568 double sum = 0.0;
569 for (i = 0; i < num_doubles; i++) {
570 sum += buf[i];
571 }
572 return sum;
573 }
574
575 #endif
576
vecsum_zcr_loop(int pass,struct libhdfs_data * ldata,struct hadoopRzOptions * zopts,const struct options * opts)577 static int vecsum_zcr_loop(int pass, struct libhdfs_data *ldata,
578 struct hadoopRzOptions *zopts,
579 const struct options *opts)
580 {
581 int32_t len;
582 double sum = 0.0;
583 const double *buf;
584 struct hadoopRzBuffer *rzbuf = NULL;
585 int ret;
586
587 while (1) {
588 rzbuf = hadoopReadZero(ldata->file, zopts, ZCR_READ_CHUNK_SIZE);
589 if (!rzbuf) {
590 ret = errno;
591 fprintf(stderr, "hadoopReadZero failed with error "
592 "code %d (%s)\n", ret, strerror(ret));
593 goto done;
594 }
595 buf = hadoopRzBufferGet(rzbuf);
596 if (!buf) break;
597 len = hadoopRzBufferLength(rzbuf);
598 if (len < ZCR_READ_CHUNK_SIZE) {
599 fprintf(stderr, "hadoopReadZero got a partial read "
600 "of length %d\n", len);
601 ret = EINVAL;
602 goto done;
603 }
604 sum += vecsum(buf,
605 ZCR_READ_CHUNK_SIZE / sizeof(double));
606 hadoopRzBufferFree(ldata->file, rzbuf);
607 }
608 printf("finished zcr pass %d. sum = %g\n", pass, sum);
609 ret = 0;
610
611 done:
612 if (rzbuf)
613 hadoopRzBufferFree(ldata->file, rzbuf);
614 return ret;
615 }
616
vecsum_zcr(struct libhdfs_data * ldata,const struct options * opts)617 static int vecsum_zcr(struct libhdfs_data *ldata,
618 const struct options *opts)
619 {
620 int ret, pass;
621 struct hadoopRzOptions *zopts = NULL;
622
623 zopts = hadoopRzOptionsAlloc();
624 if (!zopts) {
625 fprintf(stderr, "hadoopRzOptionsAlloc failed.\n");
626 ret = ENOMEM;
627 goto done;
628 }
629 if (hadoopRzOptionsSetSkipChecksum(zopts, 1)) {
630 ret = errno;
631 perror("hadoopRzOptionsSetSkipChecksum failed: ");
632 goto done;
633 }
634 if (hadoopRzOptionsSetByteBufferPool(zopts, NULL)) {
635 ret = errno;
636 perror("hadoopRzOptionsSetByteBufferPool failed: ");
637 goto done;
638 }
639 for (pass = 0; pass < opts->passes; ++pass) {
640 ret = vecsum_zcr_loop(pass, ldata, zopts, opts);
641 if (ret) {
642 fprintf(stderr, "vecsum_zcr_loop pass %d failed "
643 "with error %d\n", pass, ret);
644 goto done;
645 }
646 hdfsSeek(ldata->fs, ldata->file, 0);
647 }
648 ret = 0;
649 done:
650 if (zopts)
651 hadoopRzOptionsFree(zopts);
652 return ret;
653 }
654
hdfsReadFully(hdfsFS fs,hdfsFile f,void * buffer,tSize length)655 tSize hdfsReadFully(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
656 {
657 uint8_t *buf = buffer;
658 tSize ret, nread = 0;
659
660 while (length > 0) {
661 ret = hdfsRead(fs, f, buf, length);
662 if (ret < 0) {
663 if (errno != EINTR) {
664 return -1;
665 }
666 }
667 if (ret == 0) {
668 break;
669 }
670 nread += ret;
671 length -= ret;
672 buf += ret;
673 }
674 return nread;
675 }
676
vecsum_normal_loop(int pass,const struct libhdfs_data * ldata,const struct options * opts)677 static int vecsum_normal_loop(int pass, const struct libhdfs_data *ldata,
678 const struct options *opts)
679 {
680 double sum = 0.0;
681
682 while (1) {
683 int res = hdfsReadFully(ldata->fs, ldata->file, ldata->buf,
684 NORMAL_READ_CHUNK_SIZE);
685 if (res == 0) // EOF
686 break;
687 if (res < 0) {
688 int err = errno;
689 fprintf(stderr, "hdfsRead failed with error %d (%s)\n",
690 err, strerror(err));
691 return err;
692 }
693 if (res < NORMAL_READ_CHUNK_SIZE) {
694 fprintf(stderr, "hdfsRead got a partial read of "
695 "length %d\n", res);
696 return EINVAL;
697 }
698 sum += vecsum(ldata->buf,
699 NORMAL_READ_CHUNK_SIZE / sizeof(double));
700 }
701 printf("finished normal pass %d. sum = %g\n", pass, sum);
702 return 0;
703 }
704
vecsum_libhdfs(struct libhdfs_data * ldata,const struct options * opts)705 static int vecsum_libhdfs(struct libhdfs_data *ldata,
706 const struct options *opts)
707 {
708 int pass;
709
710 ldata->buf = malloc(NORMAL_READ_CHUNK_SIZE);
711 if (!ldata->buf) {
712 fprintf(stderr, "failed to malloc buffer of size %d\n",
713 NORMAL_READ_CHUNK_SIZE);
714 return ENOMEM;
715 }
716 for (pass = 0; pass < opts->passes; ++pass) {
717 int ret = vecsum_normal_loop(pass, ldata, opts);
718 if (ret) {
719 fprintf(stderr, "vecsum_normal_loop pass %d failed "
720 "with error %d\n", pass, ret);
721 return ret;
722 }
723 hdfsSeek(ldata->fs, ldata->file, 0);
724 }
725 return 0;
726 }
727
vecsum_local(struct local_data * cdata,const struct options * opts)728 static void vecsum_local(struct local_data *cdata, const struct options *opts)
729 {
730 int pass;
731
732 for (pass = 0; pass < opts->passes; pass++) {
733 double sum = vecsum(cdata->mmap, cdata->length / sizeof(double));
734 printf("finished vecsum_local pass %d. sum = %g\n", pass, sum);
735 }
736 }
737
vecsum_length(const struct options * opts,const struct libhdfs_data * ldata)738 static long long vecsum_length(const struct options *opts,
739 const struct libhdfs_data *ldata)
740 {
741 if (opts->ty == VECSUM_LOCAL) {
742 struct stat st_buf = { 0 };
743 if (stat(opts->path, &st_buf)) {
744 int err = errno;
745 fprintf(stderr, "vecsum_length: stat(%s) failed: "
746 "error %d (%s)\n", opts->path, err, strerror(err));
747 return -EIO;
748 }
749 return st_buf.st_size;
750 } else {
751 return ldata->length;
752 }
753 }
754
755 /*
756 * vecsum is a microbenchmark which measures the speed of various ways of
757 * reading from HDFS. It creates a file containing floating-point 'doubles',
758 * and computes the sum of all the doubles several times. For some CPUs,
759 * assembly optimizations are used for the summation (SSE, etc).
760 */
main(void)761 int main(void)
762 {
763 int ret = 1;
764 struct options *opts = NULL;
765 struct local_data *cdata = NULL;
766 struct libhdfs_data *ldata = NULL;
767 struct stopwatch *watch = NULL;
768
769 if (check_byte_size(VECSUM_CHUNK_SIZE, "VECSUM_CHUNK_SIZE") ||
770 check_byte_size(ZCR_READ_CHUNK_SIZE,
771 "ZCR_READ_CHUNK_SIZE") ||
772 check_byte_size(NORMAL_READ_CHUNK_SIZE,
773 "NORMAL_READ_CHUNK_SIZE")) {
774 goto done;
775 }
776 opts = options_create();
777 if (!opts)
778 goto done;
779 if (opts->ty == VECSUM_LOCAL) {
780 cdata = local_data_create(opts);
781 if (!cdata)
782 goto done;
783 } else {
784 ldata = libhdfs_data_create(opts);
785 if (!ldata)
786 goto done;
787 }
788 watch = stopwatch_create();
789 if (!watch)
790 goto done;
791 switch (opts->ty) {
792 case VECSUM_LOCAL:
793 vecsum_local(cdata, opts);
794 ret = 0;
795 break;
796 case VECSUM_LIBHDFS:
797 ret = vecsum_libhdfs(ldata, opts);
798 break;
799 case VECSUM_ZCR:
800 ret = vecsum_zcr(ldata, opts);
801 break;
802 }
803 if (ret) {
804 fprintf(stderr, "vecsum failed with error %d\n", ret);
805 goto done;
806 }
807 ret = 0;
808 done:
809 fprintf(stderr, "cleaning up...\n");
810 if (watch && (ret == 0)) {
811 long long length = vecsum_length(opts, ldata);
812 if (length >= 0) {
813 stopwatch_stop(watch, length * opts->passes);
814 }
815 }
816 if (cdata)
817 local_data_free(cdata);
818 if (ldata)
819 libhdfs_data_free(ldata);
820 if (opts)
821 options_free(opts);
822 return ret;
823 }
824
825 // vim: ts=4:sw=4:tw=79:et
826