1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/mman.h>
25 #include <sys/resource.h>
26 #include <sys/stat.h>
27 #include <sys/time.h>
28 #include <sys/types.h>
29 #include <time.h>
30 #include <unistd.h>
31 
32 #ifdef __MACH__ // OS X does not have clock_gettime
33 #include <mach/clock.h>
34 #include <mach/mach.h>
35 #include <mach/mach_time.h>
36 #endif
37 
38 #include "config.h"
39 #include "hdfs.h"
40 
41 #define VECSUM_CHUNK_SIZE (8 * 1024 * 1024)
42 #define ZCR_READ_CHUNK_SIZE (1024 * 1024 * 8)
43 #define NORMAL_READ_CHUNK_SIZE (8 * 1024 * 1024)
44 #define DOUBLES_PER_LOOP_ITER 16
45 
timespec_to_double(const struct timespec * ts)46 static double timespec_to_double(const struct timespec *ts)
47 {
48     double sec = ts->tv_sec;
49     double nsec = ts->tv_nsec;
50     return sec + (nsec / 1000000000L);
51 }
52 
53 struct stopwatch {
54     struct timespec start;
55     struct timespec stop;
56 };
57 
58 
59 #ifdef __MACH__
clock_gettime_mono(struct timespec * ts)60 static int clock_gettime_mono(struct timespec * ts) {
61     static mach_timebase_info_data_t tb;
62     static uint64_t timestart = 0;
63     uint64_t t = 0;
64     if (timestart == 0) {
65         mach_timebase_info(&tb);
66         timestart = mach_absolute_time();
67     }
68     t = mach_absolute_time() - timestart;
69     t *= tb.numer;
70     t /= tb.denom;
71     ts->tv_sec = t / 1000000000ULL;
72     ts->tv_nsec = t - (ts->tv_sec * 1000000000ULL);
73     return 0;
74 }
75 #else
clock_gettime_mono(struct timespec * ts)76 static int clock_gettime_mono(struct timespec * ts) {
77     return clock_gettime(CLOCK_MONOTONIC, ts);
78 }
79 #endif
80 
stopwatch_create(void)81 static struct stopwatch *stopwatch_create(void)
82 {
83     struct stopwatch *watch;
84 
85     watch = calloc(1, sizeof(struct stopwatch));
86     if (!watch) {
87         fprintf(stderr, "failed to allocate memory for stopwatch\n");
88         goto error;
89     }
90     if (clock_gettime_mono(&watch->start)) {
91         int err = errno;
92         fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
93             "error %d (%s)\n", err, strerror(err));
94         goto error;
95     }
96     return watch;
97 
98 error:
99     free(watch);
100     return NULL;
101 }
102 
stopwatch_stop(struct stopwatch * watch,long long bytes_read)103 static void stopwatch_stop(struct stopwatch *watch,
104         long long bytes_read)
105 {
106     double elapsed, rate;
107 
108     if (clock_gettime_mono(&watch->stop)) {
109         int err = errno;
110         fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
111             "error %d (%s)\n", err, strerror(err));
112         goto done;
113     }
114     elapsed = timespec_to_double(&watch->stop) -
115         timespec_to_double(&watch->start);
116     rate = (bytes_read / elapsed) / (1024 * 1024 * 1024);
117     printf("stopwatch: took %.5g seconds to read %lld bytes, "
118         "for %.5g GB/s\n", elapsed, bytes_read, rate);
119     printf("stopwatch:  %.5g seconds\n", elapsed);
120 done:
121     free(watch);
122 }
123 
124 enum vecsum_type {
125     VECSUM_LOCAL = 0,
126     VECSUM_LIBHDFS,
127     VECSUM_ZCR,
128 };
129 
130 #define VECSUM_TYPE_VALID_VALUES "libhdfs, zcr, or local"
131 
parse_vecsum_type(const char * str)132 int parse_vecsum_type(const char *str)
133 {
134     if (strcasecmp(str, "local") == 0)
135         return VECSUM_LOCAL;
136     else if (strcasecmp(str, "libhdfs") == 0)
137         return VECSUM_LIBHDFS;
138     else if (strcasecmp(str, "zcr") == 0)
139         return VECSUM_ZCR;
140     else
141         return -1;
142 }
143 
144 struct options {
145     // The path to read.
146     const char *path;
147 
148     // Length of the file.
149     long long length;
150 
151     // The number of times to read the path.
152     int passes;
153 
154     // Type of vecsum to do
155     enum vecsum_type ty;
156 
157     // RPC address to use for HDFS
158     const char *rpc_address;
159 };
160 
options_create(void)161 static struct options *options_create(void)
162 {
163     struct options *opts = NULL;
164     const char *pass_str;
165     const char *ty_str;
166     const char *length_str;
167     int ty;
168 
169     opts = calloc(1, sizeof(struct options));
170     if (!opts) {
171         fprintf(stderr, "failed to calloc options\n");
172         goto error;
173     }
174     opts->path = getenv("VECSUM_PATH");
175     if (!opts->path) {
176         fprintf(stderr, "You must set the VECSUM_PATH environment "
177             "variable to the path of the file to read.\n");
178         goto error;
179     }
180     length_str = getenv("VECSUM_LENGTH");
181     if (!length_str) {
182         length_str = "2147483648";
183     }
184     opts->length = atoll(length_str);
185     if (!opts->length) {
186         fprintf(stderr, "Can't parse VECSUM_LENGTH of '%s'.\n",
187                 length_str);
188         goto error;
189     }
190     if (opts->length % VECSUM_CHUNK_SIZE) {
191         fprintf(stderr, "VECSUM_LENGTH must be a multiple of '%lld'.  The "
192                 "currently specified length of '%lld' is not.\n",
193                 (long long)VECSUM_CHUNK_SIZE, (long long)opts->length);
194         goto error;
195     }
196     pass_str = getenv("VECSUM_PASSES");
197     if (!pass_str) {
198         fprintf(stderr, "You must set the VECSUM_PASSES environment "
199             "variable to the number of passes to make.\n");
200         goto error;
201     }
202     opts->passes = atoi(pass_str);
203     if (opts->passes <= 0) {
204         fprintf(stderr, "Invalid value for the VECSUM_PASSES "
205             "environment variable.  You must set this to a "
206             "number greater than 0.\n");
207         goto error;
208     }
209     ty_str = getenv("VECSUM_TYPE");
210     if (!ty_str) {
211         fprintf(stderr, "You must set the VECSUM_TYPE environment "
212             "variable to " VECSUM_TYPE_VALID_VALUES "\n");
213         goto error;
214     }
215     ty = parse_vecsum_type(ty_str);
216     if (ty < 0) {
217         fprintf(stderr, "Invalid VECSUM_TYPE environment variable.  "
218             "Valid values are " VECSUM_TYPE_VALID_VALUES "\n");
219         goto error;
220     }
221     opts->ty = ty;
222     opts->rpc_address = getenv("VECSUM_RPC_ADDRESS");
223     if (!opts->rpc_address) {
224         opts->rpc_address = "default";
225     }
226     return opts;
227 error:
228     free(opts);
229     return NULL;
230 }
231 
test_file_chunk_setup(double ** chunk)232 static int test_file_chunk_setup(double **chunk)
233 {
234     int i;
235     double *c, val;
236 
237     c = malloc(VECSUM_CHUNK_SIZE);
238     if (!c) {
239         fprintf(stderr, "test_file_create: failed to malloc "
240                 "a buffer of size '%lld'\n",
241                 (long long) VECSUM_CHUNK_SIZE);
242         return EIO;
243     }
244     val = 0.0;
245     for (i = 0; i < VECSUM_CHUNK_SIZE / sizeof(double); i++) {
246         c[i] = val;
247         val += 0.5;
248     }
249     *chunk = c;
250     return 0;
251 }
252 
options_free(struct options * opts)253 static void options_free(struct options *opts)
254 {
255     free(opts);
256 }
257 
258 struct local_data {
259     int fd;
260     double *mmap;
261     long long length;
262 };
263 
local_data_create_file(struct local_data * cdata,const struct options * opts)264 static int local_data_create_file(struct local_data *cdata,
265                                   const struct options *opts)
266 {
267     int ret = EIO;
268     int dup_fd = -1;
269     FILE *fp = NULL;
270     double *chunk = NULL;
271     long long offset = 0;
272 
273     dup_fd = dup(cdata->fd);
274     if (dup_fd < 0) {
275         ret = errno;
276         fprintf(stderr, "local_data_create_file: dup failed: %s (%d)\n",
277                 strerror(ret), ret);
278         goto done;
279     }
280     fp = fdopen(dup_fd, "w");
281     if (!fp) {
282         ret = errno;
283         fprintf(stderr, "local_data_create_file: fdopen failed: %s (%d)\n",
284                 strerror(ret), ret);
285         goto done;
286     }
287     ret = test_file_chunk_setup(&chunk);
288     if (ret)
289         goto done;
290     while (offset < opts->length) {
291         if (fwrite(chunk, VECSUM_CHUNK_SIZE, 1, fp) != 1) {
292             fprintf(stderr, "local_data_create_file: failed to write to "
293                     "the local file '%s' at offset %lld\n",
294                     opts->path, offset);
295             ret = EIO;
296             goto done;
297         }
298         offset += VECSUM_CHUNK_SIZE;
299     }
300     fprintf(stderr, "local_data_create_file: successfully re-wrote %s as "
301             "a file of length %lld\n", opts->path, opts->length);
302     ret = 0;
303 
304 done:
305     if (dup_fd >= 0) {
306         close(dup_fd);
307     }
308     if (fp) {
309         fclose(fp);
310     }
311     free(chunk);
312     return ret;
313 }
314 
local_data_create(const struct options * opts)315 static struct local_data *local_data_create(const struct options *opts)
316 {
317     struct local_data *cdata = NULL;
318     struct stat st_buf;
319 
320     cdata = malloc(sizeof(*cdata));
321     if (!cdata) {
322         fprintf(stderr, "Failed to allocate local test data.\n");
323         goto error;
324     }
325     cdata->fd = -1;
326     cdata->mmap = MAP_FAILED;
327     cdata->length = opts->length;
328 
329     cdata->fd = open(opts->path, O_RDWR | O_CREAT, 0777);
330     if (cdata->fd < 0) {
331         int err = errno;
332         fprintf(stderr, "local_data_create: failed to open %s "
333             "for read/write: error %d (%s)\n", opts->path, err, strerror(err));
334         goto error;
335     }
336     if (fstat(cdata->fd, &st_buf)) {
337         int err = errno;
338         fprintf(stderr, "local_data_create: fstat(%s) failed: "
339             "error %d (%s)\n", opts->path, err, strerror(err));
340         goto error;
341     }
342     if (st_buf.st_size != opts->length) {
343         int err;
344         fprintf(stderr, "local_data_create: current size of %s is %lld, but "
345                 "we want %lld.  Re-writing the file.\n",
346                 opts->path, (long long)st_buf.st_size,
347                 (long long)opts->length);
348         err = local_data_create_file(cdata, opts);
349         if (err)
350             goto error;
351     }
352     cdata->mmap = mmap(NULL, cdata->length, PROT_READ,
353                        MAP_PRIVATE, cdata->fd, 0);
354     if (cdata->mmap == MAP_FAILED) {
355         int err = errno;
356         fprintf(stderr, "local_data_create: mmap(%s) failed: "
357             "error %d (%s)\n", opts->path, err, strerror(err));
358         goto error;
359     }
360     return cdata;
361 
362 error:
363     if (cdata) {
364         if (cdata->fd >= 0) {
365             close(cdata->fd);
366         }
367         free(cdata);
368     }
369     return NULL;
370 }
371 
local_data_free(struct local_data * cdata)372 static void local_data_free(struct local_data *cdata)
373 {
374     close(cdata->fd);
375     munmap(cdata->mmap, cdata->length);
376 }
377 
378 struct libhdfs_data {
379     hdfsFS fs;
380     hdfsFile file;
381     long long length;
382     double *buf;
383 };
384 
libhdfs_data_free(struct libhdfs_data * ldata)385 static void libhdfs_data_free(struct libhdfs_data *ldata)
386 {
387     if (ldata->fs) {
388         free(ldata->buf);
389         if (ldata->file) {
390             hdfsCloseFile(ldata->fs, ldata->file);
391         }
392         hdfsDisconnect(ldata->fs);
393     }
394     free(ldata);
395 }
396 
libhdfs_data_create_file(struct libhdfs_data * ldata,const struct options * opts)397 static int libhdfs_data_create_file(struct libhdfs_data *ldata,
398                                     const struct options *opts)
399 {
400     int ret;
401     double *chunk = NULL;
402     long long offset = 0;
403 
404     ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_WRONLY, 0, 1, 0);
405     if (!ldata->file) {
406         ret = errno;
407         fprintf(stderr, "libhdfs_data_create_file: hdfsOpenFile(%s, "
408             "O_WRONLY) failed: error %d (%s)\n", opts->path, ret,
409             strerror(ret));
410         goto done;
411     }
412     ret = test_file_chunk_setup(&chunk);
413     if (ret)
414         goto done;
415     while (offset < opts->length) {
416         ret = hdfsWrite(ldata->fs, ldata->file, chunk, VECSUM_CHUNK_SIZE);
417         if (ret < 0) {
418             ret = errno;
419             fprintf(stderr, "libhdfs_data_create_file: got error %d (%s) at "
420                     "offset %lld of %s\n", ret, strerror(ret),
421                     offset, opts->path);
422             goto done;
423         } else if (ret < VECSUM_CHUNK_SIZE) {
424             fprintf(stderr, "libhdfs_data_create_file: got short write "
425                     "of %d at offset %lld of %s\n", ret, offset, opts->path);
426             goto done;
427         }
428         offset += VECSUM_CHUNK_SIZE;
429     }
430     ret = 0;
431 done:
432     free(chunk);
433     if (ldata->file) {
434         if (hdfsCloseFile(ldata->fs, ldata->file)) {
435             fprintf(stderr, "libhdfs_data_create_file: hdfsCloseFile error.");
436             ret = EIO;
437         }
438         ldata->file = NULL;
439     }
440     return ret;
441 }
442 
libhdfs_data_create(const struct options * opts)443 static struct libhdfs_data *libhdfs_data_create(const struct options *opts)
444 {
445     struct libhdfs_data *ldata = NULL;
446     struct hdfsBuilder *builder = NULL;
447     hdfsFileInfo *pinfo = NULL;
448 
449     ldata = calloc(1, sizeof(struct libhdfs_data));
450     if (!ldata) {
451         fprintf(stderr, "Failed to allocate libhdfs test data.\n");
452         goto error;
453     }
454     builder = hdfsNewBuilder();
455     if (!builder) {
456         fprintf(stderr, "Failed to create builder.\n");
457         goto error;
458     }
459     hdfsBuilderSetNameNode(builder, opts->rpc_address);
460     hdfsBuilderConfSetStr(builder,
461         "dfs.client.read.shortcircuit.skip.checksum", "true");
462     ldata->fs = hdfsBuilderConnect(builder);
463     if (!ldata->fs) {
464         fprintf(stderr, "Could not connect to default namenode!\n");
465         goto error;
466     }
467     pinfo = hdfsGetPathInfo(ldata->fs, opts->path);
468     if (!pinfo) {
469         int err = errno;
470         fprintf(stderr, "hdfsGetPathInfo(%s) failed: error %d (%s).  "
471                 "Attempting to re-create file.\n",
472             opts->path, err, strerror(err));
473         if (libhdfs_data_create_file(ldata, opts))
474             goto error;
475     } else if (pinfo->mSize != opts->length) {
476         fprintf(stderr, "hdfsGetPathInfo(%s) failed: length was %lld, "
477                 "but we want length %lld.  Attempting to re-create file.\n",
478                 opts->path, (long long)pinfo->mSize, (long long)opts->length);
479         if (libhdfs_data_create_file(ldata, opts))
480             goto error;
481     }
482     ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_RDONLY, 0, 0, 0);
483     if (!ldata->file) {
484         int err = errno;
485         fprintf(stderr, "hdfsOpenFile(%s) failed: error %d (%s)\n",
486             opts->path, err, strerror(err));
487         goto error;
488     }
489     ldata->length = opts->length;
490     return ldata;
491 
492 error:
493     if (pinfo)
494         hdfsFreeFileInfo(pinfo, 1);
495     if (ldata)
496         libhdfs_data_free(ldata);
497     return NULL;
498 }
499 
check_byte_size(int byte_size,const char * const str)500 static int check_byte_size(int byte_size, const char *const str)
501 {
502     if (byte_size % sizeof(double)) {
503         fprintf(stderr, "%s is not a multiple "
504             "of sizeof(double)\n", str);
505         return EINVAL;
506     }
507     if ((byte_size / sizeof(double)) % DOUBLES_PER_LOOP_ITER) {
508         fprintf(stderr, "The number of doubles contained in "
509             "%s is not a multiple of DOUBLES_PER_LOOP_ITER\n",
510             str);
511         return EINVAL;
512     }
513     return 0;
514 }
515 
516 #ifdef HAVE_INTEL_SSE_INTRINSICS
517 
518 #include <emmintrin.h>
519 
vecsum(const double * buf,int num_doubles)520 static double vecsum(const double *buf, int num_doubles)
521 {
522     int i;
523     double hi, lo;
524     __m128d x0, x1, x2, x3, x4, x5, x6, x7;
525     __m128d sum0 = _mm_set_pd(0.0,0.0);
526     __m128d sum1 = _mm_set_pd(0.0,0.0);
527     __m128d sum2 = _mm_set_pd(0.0,0.0);
528     __m128d sum3 = _mm_set_pd(0.0,0.0);
529     __m128d sum4 = _mm_set_pd(0.0,0.0);
530     __m128d sum5 = _mm_set_pd(0.0,0.0);
531     __m128d sum6 = _mm_set_pd(0.0,0.0);
532     __m128d sum7 = _mm_set_pd(0.0,0.0);
533     for (i = 0; i < num_doubles; i+=DOUBLES_PER_LOOP_ITER) {
534         x0 = _mm_load_pd(buf + i + 0);
535         x1 = _mm_load_pd(buf + i + 2);
536         x2 = _mm_load_pd(buf + i + 4);
537         x3 = _mm_load_pd(buf + i + 6);
538         x4 = _mm_load_pd(buf + i + 8);
539         x5 = _mm_load_pd(buf + i + 10);
540         x6 = _mm_load_pd(buf + i + 12);
541         x7 = _mm_load_pd(buf + i + 14);
542         sum0 = _mm_add_pd(sum0, x0);
543         sum1 = _mm_add_pd(sum1, x1);
544         sum2 = _mm_add_pd(sum2, x2);
545         sum3 = _mm_add_pd(sum3, x3);
546         sum4 = _mm_add_pd(sum4, x4);
547         sum5 = _mm_add_pd(sum5, x5);
548         sum6 = _mm_add_pd(sum6, x6);
549         sum7 = _mm_add_pd(sum7, x7);
550     }
551     x0 = _mm_add_pd(sum0, sum1);
552     x1 = _mm_add_pd(sum2, sum3);
553     x2 = _mm_add_pd(sum4, sum5);
554     x3 = _mm_add_pd(sum6, sum7);
555     x4 = _mm_add_pd(x0, x1);
556     x5 = _mm_add_pd(x2, x3);
557     x6 = _mm_add_pd(x4, x5);
558     _mm_storeh_pd(&hi, x6);
559     _mm_storel_pd(&lo, x6);
560     return hi + lo;
561 }
562 
563 #else
564 
vecsum(const double * buf,int num_doubles)565 static double vecsum(const double *buf, int num_doubles)
566 {
567     int i;
568     double sum = 0.0;
569     for (i = 0; i < num_doubles; i++) {
570         sum += buf[i];
571     }
572     return sum;
573 }
574 
575 #endif
576 
vecsum_zcr_loop(int pass,struct libhdfs_data * ldata,struct hadoopRzOptions * zopts,const struct options * opts)577 static int vecsum_zcr_loop(int pass, struct libhdfs_data *ldata,
578         struct hadoopRzOptions *zopts,
579         const struct options *opts)
580 {
581     int32_t len;
582     double sum = 0.0;
583     const double *buf;
584     struct hadoopRzBuffer *rzbuf = NULL;
585     int ret;
586 
587     while (1) {
588         rzbuf = hadoopReadZero(ldata->file, zopts, ZCR_READ_CHUNK_SIZE);
589         if (!rzbuf) {
590             ret = errno;
591             fprintf(stderr, "hadoopReadZero failed with error "
592                 "code %d (%s)\n", ret, strerror(ret));
593             goto done;
594         }
595         buf = hadoopRzBufferGet(rzbuf);
596         if (!buf) break;
597         len = hadoopRzBufferLength(rzbuf);
598         if (len < ZCR_READ_CHUNK_SIZE) {
599             fprintf(stderr, "hadoopReadZero got a partial read "
600                 "of length %d\n", len);
601             ret = EINVAL;
602             goto done;
603         }
604         sum += vecsum(buf,
605             ZCR_READ_CHUNK_SIZE / sizeof(double));
606         hadoopRzBufferFree(ldata->file, rzbuf);
607     }
608     printf("finished zcr pass %d.  sum = %g\n", pass, sum);
609     ret = 0;
610 
611 done:
612     if (rzbuf)
613         hadoopRzBufferFree(ldata->file, rzbuf);
614     return ret;
615 }
616 
vecsum_zcr(struct libhdfs_data * ldata,const struct options * opts)617 static int vecsum_zcr(struct libhdfs_data *ldata,
618         const struct options *opts)
619 {
620     int ret, pass;
621     struct hadoopRzOptions *zopts = NULL;
622 
623     zopts = hadoopRzOptionsAlloc();
624     if (!zopts) {
625         fprintf(stderr, "hadoopRzOptionsAlloc failed.\n");
626         ret = ENOMEM;
627         goto done;
628     }
629     if (hadoopRzOptionsSetSkipChecksum(zopts, 1)) {
630         ret = errno;
631         perror("hadoopRzOptionsSetSkipChecksum failed: ");
632         goto done;
633     }
634     if (hadoopRzOptionsSetByteBufferPool(zopts, NULL)) {
635         ret = errno;
636         perror("hadoopRzOptionsSetByteBufferPool failed: ");
637         goto done;
638     }
639     for (pass = 0; pass < opts->passes; ++pass) {
640         ret = vecsum_zcr_loop(pass, ldata, zopts, opts);
641         if (ret) {
642             fprintf(stderr, "vecsum_zcr_loop pass %d failed "
643                 "with error %d\n", pass, ret);
644             goto done;
645         }
646         hdfsSeek(ldata->fs, ldata->file, 0);
647     }
648     ret = 0;
649 done:
650     if (zopts)
651         hadoopRzOptionsFree(zopts);
652     return ret;
653 }
654 
hdfsReadFully(hdfsFS fs,hdfsFile f,void * buffer,tSize length)655 tSize hdfsReadFully(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
656 {
657     uint8_t *buf = buffer;
658     tSize ret, nread = 0;
659 
660     while (length > 0) {
661         ret = hdfsRead(fs, f, buf, length);
662         if (ret < 0) {
663             if (errno != EINTR) {
664                 return -1;
665             }
666         }
667         if (ret == 0) {
668             break;
669         }
670         nread += ret;
671         length -= ret;
672         buf += ret;
673     }
674     return nread;
675 }
676 
vecsum_normal_loop(int pass,const struct libhdfs_data * ldata,const struct options * opts)677 static int vecsum_normal_loop(int pass, const struct libhdfs_data *ldata,
678             const struct options *opts)
679 {
680     double sum = 0.0;
681 
682     while (1) {
683         int res = hdfsReadFully(ldata->fs, ldata->file, ldata->buf,
684                 NORMAL_READ_CHUNK_SIZE);
685         if (res == 0) // EOF
686             break;
687         if (res < 0) {
688             int err = errno;
689             fprintf(stderr, "hdfsRead failed with error %d (%s)\n",
690                 err, strerror(err));
691             return err;
692         }
693         if (res < NORMAL_READ_CHUNK_SIZE) {
694             fprintf(stderr, "hdfsRead got a partial read of "
695                 "length %d\n", res);
696             return EINVAL;
697         }
698         sum += vecsum(ldata->buf,
699                   NORMAL_READ_CHUNK_SIZE / sizeof(double));
700     }
701     printf("finished normal pass %d.  sum = %g\n", pass, sum);
702     return 0;
703 }
704 
vecsum_libhdfs(struct libhdfs_data * ldata,const struct options * opts)705 static int vecsum_libhdfs(struct libhdfs_data *ldata,
706             const struct options *opts)
707 {
708     int pass;
709 
710     ldata->buf = malloc(NORMAL_READ_CHUNK_SIZE);
711     if (!ldata->buf) {
712         fprintf(stderr, "failed to malloc buffer of size %d\n",
713             NORMAL_READ_CHUNK_SIZE);
714         return ENOMEM;
715     }
716     for (pass = 0; pass < opts->passes; ++pass) {
717         int ret = vecsum_normal_loop(pass, ldata, opts);
718         if (ret) {
719             fprintf(stderr, "vecsum_normal_loop pass %d failed "
720                 "with error %d\n", pass, ret);
721             return ret;
722         }
723         hdfsSeek(ldata->fs, ldata->file, 0);
724     }
725     return 0;
726 }
727 
vecsum_local(struct local_data * cdata,const struct options * opts)728 static void vecsum_local(struct local_data *cdata, const struct options *opts)
729 {
730     int pass;
731 
732     for (pass = 0; pass < opts->passes; pass++) {
733         double sum = vecsum(cdata->mmap, cdata->length / sizeof(double));
734         printf("finished vecsum_local pass %d.  sum = %g\n", pass, sum);
735     }
736 }
737 
vecsum_length(const struct options * opts,const struct libhdfs_data * ldata)738 static long long vecsum_length(const struct options *opts,
739                 const struct libhdfs_data *ldata)
740 {
741     if (opts->ty == VECSUM_LOCAL) {
742         struct stat st_buf = { 0 };
743         if (stat(opts->path, &st_buf)) {
744             int err = errno;
745             fprintf(stderr, "vecsum_length: stat(%s) failed: "
746                 "error %d (%s)\n", opts->path, err, strerror(err));
747             return -EIO;
748         }
749         return st_buf.st_size;
750     } else {
751         return ldata->length;
752     }
753 }
754 
755 /*
756  * vecsum is a microbenchmark which measures the speed of various ways of
757  * reading from HDFS.  It creates a file containing floating-point 'doubles',
758  * and computes the sum of all the doubles several times.  For some CPUs,
759  * assembly optimizations are used for the summation (SSE, etc).
760  */
main(void)761 int main(void)
762 {
763     int ret = 1;
764     struct options *opts = NULL;
765     struct local_data *cdata = NULL;
766     struct libhdfs_data *ldata = NULL;
767     struct stopwatch *watch = NULL;
768 
769     if (check_byte_size(VECSUM_CHUNK_SIZE, "VECSUM_CHUNK_SIZE") ||
770         check_byte_size(ZCR_READ_CHUNK_SIZE,
771                 "ZCR_READ_CHUNK_SIZE") ||
772         check_byte_size(NORMAL_READ_CHUNK_SIZE,
773                 "NORMAL_READ_CHUNK_SIZE")) {
774         goto done;
775     }
776     opts = options_create();
777     if (!opts)
778         goto done;
779     if (opts->ty == VECSUM_LOCAL) {
780         cdata = local_data_create(opts);
781         if (!cdata)
782             goto done;
783     } else {
784         ldata = libhdfs_data_create(opts);
785         if (!ldata)
786             goto done;
787     }
788     watch = stopwatch_create();
789     if (!watch)
790         goto done;
791     switch (opts->ty) {
792     case VECSUM_LOCAL:
793         vecsum_local(cdata, opts);
794         ret = 0;
795         break;
796     case VECSUM_LIBHDFS:
797         ret = vecsum_libhdfs(ldata, opts);
798         break;
799     case VECSUM_ZCR:
800         ret = vecsum_zcr(ldata, opts);
801         break;
802     }
803     if (ret) {
804         fprintf(stderr, "vecsum failed with error %d\n", ret);
805         goto done;
806     }
807     ret = 0;
808 done:
809     fprintf(stderr, "cleaning up...\n");
810     if (watch && (ret == 0)) {
811         long long length = vecsum_length(opts, ldata);
812         if (length >= 0) {
813             stopwatch_stop(watch, length * opts->passes);
814         }
815     }
816     if (cdata)
817         local_data_free(cdata);
818     if (ldata)
819         libhdfs_data_free(ldata);
820     if (opts)
821         options_free(opts);
822     return ret;
823 }
824 
825 // vim: ts=4:sw=4:tw=79:et
826