1 /*
2  * libhdfs engine
3  *
4  * this engine helps perform read/write operations on hdfs cluster using
5  * libhdfs. hdfs does not support modification of data once file is created.
6  *
7  * so to mimic that create many files of small size (e.g 256k), and this
8  * engine select a file based on the offset generated by fio.
9  *
10  * thus, random reads and writes can also be achieved with this logic.
11  *
12  */
13 
14 #include <math.h>
15 #include <hdfs.h>
16 
17 #include "../fio.h"
18 #include "../optgroup.h"
19 
20 #define CHUNCK_NAME_LENGTH_MAX 80
21 #define CHUNCK_CREATION_BUFFER_SIZE 65536
22 
23 struct hdfsio_data {
24 	hdfsFS fs;
25 	hdfsFile fp;
26 	uint64_t curr_file_id;
27 };
28 
29 struct hdfsio_options {
30 	void *pad;			/* needed because offset can't be 0 for a option defined used offsetof */
31 	char *host;
32 	char *directory;
33 	unsigned int port;
34 	unsigned int chunck_size;
35 	unsigned int single_instance;
36 	unsigned int use_direct;
37 };
38 
39 static struct fio_option options[] = {
40 	{
41 		.name	= "namenode",
42 		.lname	= "hfds namenode",
43 		.type	= FIO_OPT_STR_STORE,
44 		.off1   = offsetof(struct hdfsio_options, host),
45 		.def    = "localhost",
46 		.help	= "Namenode of the HDFS cluster",
47 		.category = FIO_OPT_C_ENGINE,
48 		.group	= FIO_OPT_G_HDFS,
49 	},
50 	{
51 		.name	= "hostname",
52 		.lname	= "hfds namenode",
53 		.type	= FIO_OPT_STR_STORE,
54 		.off1   = offsetof(struct hdfsio_options, host),
55 		.def    = "localhost",
56 		.help	= "Namenode of the HDFS cluster",
57 		.category = FIO_OPT_C_ENGINE,
58 		.group	= FIO_OPT_G_HDFS,
59 	},
60 	{
61 		.name	= "port",
62 		.lname	= "hdfs namenode port",
63 		.type	= FIO_OPT_INT,
64 		.off1	= offsetof(struct hdfsio_options, port),
65 		.def    = "9000",
66 		.minval	= 1,
67 		.maxval	= 65535,
68 		.help	= "Port used by the HDFS cluster namenode",
69 		.category = FIO_OPT_C_ENGINE,
70 		.group	= FIO_OPT_G_HDFS,
71 	},
72 	{
73 		.name	= "hdfsdirectory",
74 		.lname	= "hfds directory",
75 		.type	= FIO_OPT_STR_STORE,
76 		.off1   = offsetof(struct hdfsio_options, directory),
77 		.def    = "/",
78 		.help	= "The HDFS directory where fio will create chunks",
79 		.category = FIO_OPT_C_ENGINE,
80 		.group	= FIO_OPT_G_HDFS,
81 	},
82 	{
83 		.name	= "chunk_size",
84 		.alias	= "chunck_size",
85 		.lname	= "Chunk size",
86 		.type	= FIO_OPT_INT,
87 		.off1	= offsetof(struct hdfsio_options, chunck_size),
88 		.def    = "1048576",
89 		.help	= "Size of individual chunk",
90 		.category = FIO_OPT_C_ENGINE,
91 		.group	= FIO_OPT_G_HDFS,
92 	},
93 	{
94 		.name	= "single_instance",
95 		.lname	= "Single Instance",
96 		.type	= FIO_OPT_BOOL,
97 		.off1	= offsetof(struct hdfsio_options, single_instance),
98 		.def    = "1",
99 		.help	= "Use a single instance",
100 		.category = FIO_OPT_C_ENGINE,
101 		.group	= FIO_OPT_G_HDFS,
102 	},
103 	{
104 		.name	= "hdfs_use_direct",
105 		.lname	= "HDFS Use Direct",
106 		.type	= FIO_OPT_BOOL,
107 		.off1	= offsetof(struct hdfsio_options, use_direct),
108 		.def    = "0",
109 		.help	= "Use readDirect instead of hdfsRead",
110 		.category = FIO_OPT_C_ENGINE,
111 		.group	= FIO_OPT_G_HDFS,
112 	},
113 	{
114 		.name	= NULL,
115 	},
116 };
117 
118 
get_chunck_name(char * dest,char * file_name,uint64_t chunk_id)119 static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
120 	return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
121 }
122 
fio_hdfsio_prep(struct thread_data * td,struct io_u * io_u)123 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
124 {
125 	struct hdfsio_options *options = td->eo;
126 	struct hdfsio_data *hd = td->io_ops_data;
127 	unsigned long f_id;
128 	char fname[CHUNCK_NAME_LENGTH_MAX];
129 	int open_flags;
130 
131 	/* find out file id based on the offset generated by fio */
132 	f_id = floor(io_u->offset / options-> chunck_size);
133 
134 	if (f_id == hd->curr_file_id) {
135 		/* file is already open */
136 		return 0;
137 	}
138 
139 	if (hd->curr_file_id != -1) {
140 		if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
141 			log_err("hdfs: unable to close file: %s\n", strerror(errno));
142 			return errno;
143 		}
144 		hd->curr_file_id = -1;
145 	}
146 
147 	if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
148 		open_flags = O_RDONLY;
149 	} else if (io_u->ddir == DDIR_WRITE) {
150 		open_flags = O_WRONLY;
151 	} else {
152 		log_err("hdfs: Invalid I/O Operation\n");
153 		return 0;
154 	}
155 
156 	get_chunck_name(fname, io_u->file->file_name, f_id);
157 	hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
158 			      options->chunck_size);
159 	if(hd->fp == NULL) {
160 		log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
161 		return errno;
162 	}
163 	hd->curr_file_id = f_id;
164 
165 	return 0;
166 }
167 
fio_hdfsio_queue(struct thread_data * td,struct io_u * io_u)168 static enum fio_q_status fio_hdfsio_queue(struct thread_data *td,
169 					  struct io_u *io_u)
170 {
171 	struct hdfsio_data *hd = td->io_ops_data;
172 	struct hdfsio_options *options = td->eo;
173 	int ret;
174 	unsigned long offset;
175 
176 	offset = io_u->offset % options->chunck_size;
177 
178 	if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) &&
179 	     hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
180 		log_err("hdfs: seek failed: %s, are you doing random write smaller than chunk size ?\n", strerror(errno));
181 		io_u->error = errno;
182 		return FIO_Q_COMPLETED;
183 	};
184 
185 	// do the IO
186 	if (io_u->ddir == DDIR_READ) {
187 		if (options->use_direct) {
188 			ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
189 		} else {
190 			ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
191 		}
192 	} else if (io_u->ddir == DDIR_WRITE) {
193 		ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
194 				io_u->xfer_buflen);
195 	} else if (io_u->ddir == DDIR_SYNC) {
196 		ret = hdfsFlush(hd->fs, hd->fp);
197 	} else {
198 		log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
199 		ret = EINVAL;
200 	}
201 
202 	// Check if the IO went fine, or is incomplete
203 	if (ret != (int)io_u->xfer_buflen) {
204 		if (ret >= 0) {
205 			io_u->resid = io_u->xfer_buflen - ret;
206 			io_u->error = 0;
207 			return FIO_Q_COMPLETED;
208 		} else {
209 			io_u->error = errno;
210 		}
211 	}
212 
213 	if (io_u->error)
214 		td_verror(td, io_u->error, "xfer");
215 
216 	return FIO_Q_COMPLETED;
217 }
218 
fio_hdfsio_open_file(struct thread_data * td,struct fio_file * f)219 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
220 {
221 	if (td->o.odirect) {
222 		td->error = EINVAL;
223 		return 0;
224 	}
225 
226 	return 0;
227 }
228 
fio_hdfsio_close_file(struct thread_data * td,struct fio_file * f)229 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
230 {
231 	struct hdfsio_data *hd = td->io_ops_data;
232 
233 	if (hd->curr_file_id != -1) {
234 		if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
235 			log_err("hdfs: unable to close file: %s\n", strerror(errno));
236 			return errno;
237 		}
238 		hd->curr_file_id = -1;
239 	}
240 	return 0;
241 }
242 
fio_hdfsio_io_u_init(struct thread_data * td,struct io_u * io_u)243 static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
244 {
245 	struct hdfsio_options *options = td->eo;
246 	struct hdfsio_data *hd = td->io_ops_data;
247 	struct fio_file *f;
248 	uint64_t j,k;
249 	int i, failure = 0;
250 	uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
251 	uint64_t bytes_left;
252 	char fname[CHUNCK_NAME_LENGTH_MAX];
253 	hdfsFile fp;
254 	hdfsFileInfo *fi;
255 	tOffset fi_size;
256 
257 	for_each_file(td, f, i) {
258 		k = 0;
259 		for(j=0; j < f->real_file_size; j += options->chunck_size) {
260 			get_chunck_name(fname, f->file_name, k++);
261 			fi = hdfsGetPathInfo(hd->fs, fname);
262 			fi_size = fi ? fi->mSize : 0;
263 			// fill exist and is big enough, nothing to do
264 			if( fi && fi_size >= options->chunck_size) {
265 				continue;
266 			}
267 			fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
268 					  options->chunck_size);
269 			if(fp == NULL) {
270 				failure = errno;
271 				log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
272 				break;
273 			}
274 			bytes_left = options->chunck_size;
275 			memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
276 			while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
277 				if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
278 				    != CHUNCK_CREATION_BUFFER_SIZE) {
279     					failure = errno;
280 	    				log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
281 					break;
282 				};
283 				bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
284 			}
285 			if(bytes_left > 0) {
286 				if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
287 				    != bytes_left) {
288 					failure = errno;
289 					break;
290 				};
291 			}
292 			if( hdfsCloseFile(hd->fs, fp) != 0) {
293 				failure = errno;
294 				log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
295 				break;
296 			}
297 		}
298 		if(failure) {
299 			break;
300 		}
301 	}
302 
303 	if( !failure ) {
304 		fio_file_set_size_known(f);
305 	}
306 
307 	return failure;
308 }
309 
fio_hdfsio_setup(struct thread_data * td)310 static int fio_hdfsio_setup(struct thread_data *td)
311 {
312 	struct hdfsio_data *hd;
313 	struct fio_file *f;
314 	int i;
315 	uint64_t file_size, total_file_size;
316 
317 	if (!td->io_ops_data) {
318 		hd = malloc(sizeof(*hd));
319 		memset(hd, 0, sizeof(*hd));
320 
321 		hd->curr_file_id = -1;
322 
323 		td->io_ops_data = hd;
324 	}
325 
326 	total_file_size = 0;
327 	file_size = 0;
328 
329 	for_each_file(td, f, i) {
330 		if(!td->o.file_size_low) {
331 			file_size = floor(td->o.size / td->o.nr_files);
332 			total_file_size += file_size;
333 		}
334 		else if (td->o.file_size_low == td->o.file_size_high)
335 			file_size = td->o.file_size_low;
336 		else {
337 			file_size = get_rand_file_size(td);
338 		}
339 		f->real_file_size = file_size;
340 	}
341 	/* If the size doesn't divide nicely with the chunk size,
342 	 * make the last files bigger.
343 	 * Used only if filesize was not explicitly given
344 	 */
345 	if (!td->o.file_size_low && total_file_size < td->o.size) {
346 		f->real_file_size += (td->o.size - total_file_size);
347 	}
348 
349 	return 0;
350 }
351 
fio_hdfsio_init(struct thread_data * td)352 static int fio_hdfsio_init(struct thread_data *td)
353 {
354 	struct hdfsio_data *hd = td->io_ops_data;
355 	struct hdfsio_options *options = td->eo;
356 	int failure;
357 	struct hdfsBuilder *bld;
358 
359 	if (options->host == NULL || options->port == 0) {
360 		log_err("hdfs: server not defined\n");
361 		return EINVAL;
362 	}
363 
364 	bld = hdfsNewBuilder();
365 	if (!bld) {
366 		failure = errno;
367 		log_err("hdfs: unable to allocate connect builder\n");
368 		return failure;
369 	}
370 	hdfsBuilderSetNameNode(bld, options->host);
371 	hdfsBuilderSetNameNodePort(bld, options->port);
372 	if(! options->single_instance) {
373 		hdfsBuilderSetForceNewInstance(bld);
374 	}
375 	hd->fs = hdfsBuilderConnect(bld);
376 
377 	/* hdfsSetWorkingDirectory succeed on non-existent directory */
378 	if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
379 		failure = errno;
380 		log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
381 		return failure;
382 	}
383 
384 	return 0;
385 }
386 
fio_hdfsio_io_u_free(struct thread_data * td,struct io_u * io_u)387 static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
388 {
389 	struct hdfsio_data *hd = td->io_ops_data;
390 
391 	if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
392 		log_err("hdfs: disconnect failed: %d\n", errno);
393 	}
394 }
395 
396 FIO_STATIC struct ioengine_ops ioengine = {
397 	.name = "libhdfs",
398 	.version = FIO_IOOPS_VERSION,
399 	.flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
400 	.setup = fio_hdfsio_setup,
401 	.init = fio_hdfsio_init,
402 	.prep = fio_hdfsio_prep,
403 	.queue = fio_hdfsio_queue,
404 	.open_file = fio_hdfsio_open_file,
405 	.close_file = fio_hdfsio_close_file,
406 	.io_u_init = fio_hdfsio_io_u_init,
407 	.io_u_free = fio_hdfsio_io_u_free,
408 	.option_struct_size	= sizeof(struct hdfsio_options),
409 	.options		= options,
410 };
411 
412 
fio_hdfsio_register(void)413 static void fio_init fio_hdfsio_register(void)
414 {
415 	register_ioengine(&ioengine);
416 }
417 
fio_hdfsio_unregister(void)418 static void fio_exit fio_hdfsio_unregister(void)
419 {
420 	unregister_ioengine(&ioengine);
421 }
422