1 /*
2 BAREOS® - Backup Archiving REcovery Open Sourced
3
4 Copyright (C) 2014-2017 Planets Communications B.V.
5 Copyright (C) 2014-2018 Bareos GmbH & Co. KG
6
7 This program is Free Software; you can redistribute it and/or
8 modify it under the terms of version three of the GNU Affero General Public
9 License as published by the Free Software Foundation and included
10 in the file LICENSE.
11
12 This program is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Affero General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20 02110-1301, USA.
21 */
22 /*
23 * Marco van Wieringen, February 2014
24 *
25 * Object Storage API device abstraction.
26 *
27 * Stacking is the following:
28 *
29 * droplet_device::
30 * |
31 * v
32 * chunked_device::
33 * |
34 * v
35 * Device::
36 *
37 */
38 /**
39 * @file
40 * Object Storage API device abstraction.
41 */
42
43 #include "include/bareos.h"
44
45 #ifdef HAVE_DROPLET
46 #include "stored/stored.h"
47 #include "chunked_device.h"
48 #include "droplet_device.h"
49 #include "lib/edit.h"
50
51 namespace storagedaemon {
52
53 /**
54 * Options that can be specified for this device type.
55 */
56 enum device_option_type {
57 argument_none = 0,
58 argument_profile,
59 argument_location,
60 argument_canned_acl,
61 argument_storage_class,
62 argument_bucket,
63 argument_chunksize,
64 argument_iothreads,
65 argument_ioslots,
66 argument_retries,
67 argument_mmap
68 };
69
70 struct device_option {
71 const char *name;
72 enum device_option_type type;
73 int compare_size;
74 };
75
76 static device_option device_options[] = {
77 { "profile=", argument_profile, 8 },
78 { "location=", argument_location, 9 },
79 { "acl=", argument_canned_acl, 4 },
80 { "storageclass=", argument_storage_class, 13 },
81 { "bucket=", argument_bucket, 7 },
82 { "chunksize=", argument_chunksize, 10 },
83 { "iothreads=", argument_iothreads, 10 },
84 { "ioslots=", argument_ioslots, 8 },
85 { "retries=", argument_retries, 8 },
86 { "mmap", argument_mmap, 4 },
87 { NULL, argument_none }
88 };
89
90 static int droplet_reference_count = 0;
91 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
92
93 /**
94 * Generic log function that glues libdroplet with BAREOS.
95 */
DropletDeviceLogfunc(dpl_ctx_t * ctx,dpl_log_level_t level,const char * message)96 static void DropletDeviceLogfunc(dpl_ctx_t *ctx, dpl_log_level_t level, const char *message)
97 {
98 switch (level) {
99 case DPL_DEBUG:
100 Dmsg1(100, "%s\n", message);
101 break;
102 case DPL_INFO:
103 Emsg1(M_INFO, 0, "%s\n", message);
104 break;
105 case DPL_WARNING:
106 Emsg1(M_WARNING, 0, "%s\n", message);
107 break;
108 case DPL_ERROR:
109 Emsg1(M_ERROR, 0, "%s\n", message);
110 break;
111 default:
112 break;
113 }
114 }
115
116 /**
117 * Map the droplet errno's to system ones.
118 */
DropletErrnoToSystemErrno(dpl_status_t status)119 static inline int DropletErrnoToSystemErrno(dpl_status_t status)
120 {
121 switch (status) {
122 case DPL_ENOENT:
123 errno = ENOENT;
124 break;
125 case DPL_ETIMEOUT:
126 errno = ETIMEDOUT;
127 case DPL_ENOMEM:
128 errno = ENOMEM;
129 break;
130 case DPL_EIO:
131 errno = EIO;
132 break;
133 case DPL_ENAMETOOLONG:
134 errno = ENAMETOOLONG;
135 break;
136 case DPL_ENOTDIR:
137 errno = ENOTDIR;
138 break;
139 case DPL_ENOTEMPTY:
140 errno = ENOTEMPTY;
141 break;
142 case DPL_EISDIR:
143 errno = EISDIR;
144 break;
145 case DPL_EEXIST:
146 errno = EEXIST;
147 break;
148 case DPL_EPERM:
149 errno = EPERM;
150 break;
151 case DPL_FAILURE: /**< General failure */
152 errno = EIO;
153 break;
154 default:
155 errno = EINVAL;
156 break;
157 }
158
159 return errno;
160 }
161
162
163
164 /*
165 * Callback for getting the total size of a chunked volume.
166 */
chunked_volume_size_callback(dpl_sysmd_t * sysmd,dpl_ctx_t * ctx,const char * chunkpath,void * data)167 static dpl_status_t chunked_volume_size_callback(dpl_sysmd_t *sysmd, dpl_ctx_t *ctx, const char *chunkpath, void *data)
168 {
169 dpl_status_t status = DPL_SUCCESS;
170 ssize_t *volumesize = (ssize_t *)data;
171
172 *volumesize = *volumesize + sysmd->size;
173
174 return status;
175 }
176
177 /*
178 * Callback for truncating a chunked volume.
179 *
180 * @return DPL_SUCCESS on success, on error: a dpl_status_t value that represents the error.
181 */
chunked_volume_truncate_callback(dpl_sysmd_t * sysmd,dpl_ctx_t * ctx,const char * chunkpath,void * data)182 static dpl_status_t chunked_volume_truncate_callback(dpl_sysmd_t *sysmd, dpl_ctx_t *ctx, const char *chunkpath, void *data)
183 {
184 dpl_status_t status = DPL_SUCCESS;
185
186 status = dpl_unlink(ctx, chunkpath);
187
188 switch (status) {
189 case DPL_SUCCESS:
190 break;
191 default:
192 /* no error message here, as error will be set by calling function. */
193 return status;
194 }
195
196 return status;
197 }
198
199
200 /*
201 * Generic function that walks a dirname and calls the callback
202 * function for each entry it finds in that directory.
203 *
204 * @return: true - if no error occured
205 * false - if an error has occured. Sets dev_errno and errmsg to the first error.
206 */
walk_chunks(const char * dirname,t_dpl_walk_chunks_call_back callback,void * data,bool ignore_gaps)207 bool droplet_device::walk_chunks(const char *dirname, t_dpl_walk_chunks_call_back callback, void *data, bool ignore_gaps)
208 {
209 bool retval = true;
210 dpl_status_t status;
211 dpl_status_t callback_status;
212 dpl_sysmd_t *sysmd = NULL;
213 PoolMem path(PM_NAME);
214
215 sysmd = dpl_sysmd_dup(&sysmd_);
216 bool found = true;
217 int i = 0;
218 while ((i < max_chunks_) && (found) && (retval)) {
219 path.bsprintf("%s/%04d", dirname, i);
220
221 status = dpl_getattr(ctx_, /* context */
222 path.c_str(), /* locator */
223 NULL, /* metadata */
224 sysmd); /* sysmd */
225
226 switch (status) {
227 case DPL_SUCCESS:
228 Dmsg1(100, "chunk %s exists. Calling callback.\n", path.c_str());
229 callback_status = callback(sysmd, ctx_, path.c_str(), data);
230 if (callback_status == DPL_SUCCESS) {
231 i++;
232 } else {
233 Mmsg2(errmsg, _("Operation failed on chunk %s: ERR=%s."),
234 path.c_str(), dpl_status_str(callback_status));
235 dev_errno = DropletErrnoToSystemErrno(callback_status);
236 /* exit loop */
237 retval = false;
238 }
239 break;
240 case DPL_ENOENT:
241 if (ignore_gaps) {
242 Dmsg1(1000, "chunk %s does not exists. Skipped.\n", path.c_str());
243 i++;
244 } else {
245 Dmsg1(100, "chunk %s does not exists. Exiting.\n", path.c_str());
246 found = false;
247 }
248 break;
249 default:
250 Dmsg2(100, "chunk %s failure: %s. Exiting.\n", path.c_str(), dpl_status_str(callback_status));
251 found = false;
252 break;
253 }
254 }
255
256 if (sysmd) {
257 dpl_sysmd_free(sysmd);
258 sysmd = NULL;
259 }
260
261 return retval;
262 }
263
264 /**
265 * Check if a specific path exists.
266 * It uses dpl_getattr() for this.
267 * However, dpl_getattr() results wrong results in a couple of situations,
268 * espescially directoy names should not be checked using a prepended "/".
269 *
270 * Results in detail:
271 *
272 * path | "name" | "name/" | target reachable | target not reachable | target not reachable | wrong credentials
273 * | exists | exists | | (already initialized) | (not initialized) |
274 * -------------------------------------------------------------------------------------------------------------------
275 * "" | - | yes | DPL_SUCCESS | DPL_SUCCESS (!) | DPL_FAILURE | DPL_EPERM
276 * "/" | yes | - | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
277 *
278 * "name" | - | - | DPL_ENOENT | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
279 * "/name" | - | - | DPL_ENOENT | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
280 * "name/" | - | - | DPL_ENOENT | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
281 * "/name/" | - | - | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!)
282 *
283 * "name" | yes | - | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
284 * "/name" | yes | - | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
285 * "name/" | yes | - | DPL_ENOTDIR | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
286 * "/name/" | yes | - | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!)
287 *
288 * "name" | - | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
289 * "/name" | - | yes | DPL_ENOENT (!) | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
290 * "name/" | - | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
291 * "/name/" | - | yes | DPL_SUCCESS | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!)
292 *
293 * "name" | yes | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
294 * "/name" | yes | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
295 * "name/" | yes | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
296 * "/name/" | yes | yes | DPL_SUCCESS | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!)
297 *
298 * Best test for
299 * directories as "dir/"
300 * files as "file" or "/file".
301 *
302 * Returns DPL_SUCCESS - if path exists and can be accessed
303 * DPL_* errorcode - otherwise
304 */
check_path(const char * path)305 dpl_status_t droplet_device::check_path(const char *path)
306 {
307 dpl_status_t status;
308 dpl_sysmd_t *sysmd = NULL;
309
310 sysmd = dpl_sysmd_dup(&sysmd_);
311 status = dpl_getattr(ctx_, /* context */
312 path, /* locator */
313 NULL, /* metadata */
314 sysmd); /* sysmd */
315 Dmsg4(100, "check_path(device=%s, bucket=%s, path=%s): %s\n", prt_name, ctx_->cur_bucket, path, dpl_status_str(status));
316 dpl_sysmd_free(sysmd);
317
318 return status;
319 }
320
321
322
323 /**
324 * Checks if the connection to the backend storage system is possible.
325 *
326 * Returns true - if connection can be established
327 * false - otherwise
328 */
CheckRemote()329 bool droplet_device::CheckRemote()
330 {
331 if (!ctx_) {
332 if (!initialize()) {
333 return false;
334 }
335 }
336
337 if (check_path("/") != DPL_SUCCESS) {
338 Dmsg1(100, "CheckRemote(%s): failed\n", prt_name);
339 return false;
340 }
341
342 Dmsg1(100, "CheckRemote(%s): ok\n", prt_name);
343
344 return true;
345 }
346
347
348
remote_chunked_volume_exists()349 bool droplet_device::remote_chunked_volume_exists()
350 {
351 bool retval = false;
352 dpl_status_t status;
353 PoolMem chunk_dir(PM_FNAME);
354
355 if (!CheckRemote()) {
356 return false;
357 }
358
359 Mmsg(chunk_dir, "%s/", getVolCatName());
360 status = check_path(chunk_dir.c_str());
361
362 switch (status) {
363 case DPL_SUCCESS:
364 Dmsg1(100, "remote_chunked_volume %s exists\n", chunk_dir.c_str());
365 retval = true;
366 break;
367 case DPL_ENOENT:
368 case DPL_FAILURE:
369 default:
370 Dmsg1(100, "remote_chunked_volume %s does not exists\n", chunk_dir.c_str());
371 break;
372 }
373
374 return retval;
375 }
376
377
378 /*
379 * Internal method for flushing a chunk to the backing store.
380 * This does the real work either by being called from a
381 * io-thread or directly blocking the device.
382 */
FlushRemoteChunk(chunk_io_request * request)383 bool droplet_device::FlushRemoteChunk(chunk_io_request *request)
384 {
385 bool retval = false;
386 dpl_status_t status;
387 dpl_option_t dpl_options;
388 dpl_sysmd_t *sysmd = NULL;
389 PoolMem chunk_dir(PM_FNAME),
390 chunk_name(PM_FNAME);
391
392 Mmsg(chunk_dir, "/%s", request->volname);
393 Mmsg(chunk_name, "%s/%04d", chunk_dir.c_str(), request->chunk);
394
395 /*
396 * Set that we are uploading the chunk.
397 */
398 if (!SetInflightChunk(request)) {
399 goto bail_out;
400 }
401
402 Dmsg1(100, "Flushing chunk %s\n", chunk_name.c_str());
403
404 /*
405 * Check on the remote backing store if the chunk already exists.
406 * We only upload this chunk if it is bigger then the chunk that exists
407 * on the remote backing store. When using io-threads it could happen
408 * that there are multiple flush requests for the same chunk when a
409 * chunk is reused in a next backup job. We only want the chunk with
410 * the biggest amount of valid data to persist as we only append to
411 * chunks.
412 */
413 sysmd = dpl_sysmd_dup(&sysmd_);
414 status = dpl_getattr(ctx_, /* context */
415 chunk_name.c_str(), /* locator */
416 NULL, /* metadata */
417 sysmd); /* sysmd */
418
419 switch (status) {
420 case DPL_SUCCESS:
421 if (sysmd->size > request->wbuflen) {
422 retval = true;
423 goto bail_out;
424 }
425 break;
426 default:
427 /*
428 * Check on the remote backing store if the chunkdir exists.
429 */
430 dpl_sysmd_free(sysmd);
431 sysmd = dpl_sysmd_dup(&sysmd_);
432 status = dpl_getattr(ctx_, /* context */
433 chunk_dir.c_str(), /* locator */
434 NULL, /* metadata */
435 sysmd); /* sysmd */
436
437 switch (status) {
438 case DPL_SUCCESS:
439 break;
440 case DPL_ENOENT:
441 case DPL_FAILURE:
442 /*
443 * Make sure the chunk directory with the name of the volume exists.
444 */
445 dpl_sysmd_free(sysmd);
446 sysmd = dpl_sysmd_dup(&sysmd_);
447 status = dpl_mkdir(ctx_, /* context */
448 chunk_dir.c_str(), /* locator */
449 NULL, /* metadata */
450 sysmd);/* sysmd */
451
452 switch (status) {
453 case DPL_SUCCESS:
454 break;
455 default:
456 Mmsg2(errmsg, _("Failed to create directory %s using dpl_mkdir(): ERR=%s.\n"),
457 chunk_dir.c_str(), dpl_status_str(status));
458 dev_errno = DropletErrnoToSystemErrno(status);
459 goto bail_out;
460 }
461 break;
462 default:
463 break;
464 }
465 break;
466 }
467
468 /*
469 * Create some options for libdroplet.
470 *
471 * DPL_OPTION_NOALLOC - we provide the buffer to copy the data into
472 * no need to let the library allocate memory we
473 * need to free after copying the data.
474 */
475 memset(&dpl_options, 0, sizeof(dpl_options));
476 dpl_options.mask |= DPL_OPTION_NOALLOC;
477
478 dpl_sysmd_free(sysmd);
479 sysmd = dpl_sysmd_dup(&sysmd_);
480 status = dpl_fput(ctx_, /* context */
481 chunk_name.c_str(), /* locator */
482 &dpl_options, /* options */
483 NULL, /* condition */
484 NULL, /* range */
485 NULL, /* metadata */
486 sysmd, /* sysmd */
487 (char *)request->buffer, /* data_buf */
488 request->wbuflen); /* data_len */
489
490 switch (status) {
491 case DPL_SUCCESS:
492 break;
493 default:
494 Mmsg2(errmsg, _("Failed to flush %s using dpl_fput(): ERR=%s.\n"),
495 chunk_name.c_str(), dpl_status_str(status));
496 dev_errno = DropletErrnoToSystemErrno(status);
497 goto bail_out;
498 }
499
500 retval = true;
501
502 bail_out:
503 /*
504 * Clear that we are uploading the chunk.
505 */
506 ClearInflightChunk(request);
507
508 if (sysmd) {
509 dpl_sysmd_free(sysmd);
510 }
511
512 return retval;
513 }
514
515 /*
516 * Internal method for reading a chunk from the remote backing store.
517 */
ReadRemoteChunk(chunk_io_request * request)518 bool droplet_device::ReadRemoteChunk(chunk_io_request *request)
519 {
520 bool retval = false;
521 dpl_status_t status;
522 dpl_option_t dpl_options;
523 dpl_range_t dpl_range;
524 dpl_sysmd_t *sysmd = NULL;
525 PoolMem chunk_name(PM_FNAME);
526
527 Mmsg(chunk_name, "/%s/%04d", request->volname, request->chunk);
528 Dmsg1(100, "Reading chunk %s\n", chunk_name.c_str());
529
530 /*
531 * See if chunk exists.
532 */
533 sysmd = dpl_sysmd_dup(&sysmd_);
534 status = dpl_getattr(ctx_, /* context */
535 chunk_name.c_str(), /* locator */
536 NULL, /* metadata */
537 sysmd); /* sysmd */
538
539 switch (status) {
540 case DPL_SUCCESS:
541 break;
542 default:
543 Mmsg1(errmsg, _("Failed to open %s doesn't exist\n"), chunk_name.c_str());
544 Dmsg1(100, "%s", errmsg);
545 dev_errno = EIO;
546 goto bail_out;
547 }
548
549 if (sysmd->size > request->wbuflen) {
550 Mmsg3(errmsg, _("Failed to read %s (%ld) to big to fit in chunksize of %ld bytes\n"),
551 chunk_name.c_str(), sysmd->size, request->wbuflen);
552 Dmsg1(100, "%s", errmsg);
553 dev_errno = EINVAL;
554 goto bail_out;
555 }
556
557 /*
558 * Create some options for libdroplet.
559 *
560 * DPL_OPTION_NOALLOC - we provide the buffer to copy the data into
561 * no need to let the library allocate memory we
562 * need to free after copying the data.
563 */
564 memset(&dpl_options, 0, sizeof(dpl_options));
565 dpl_options.mask |= DPL_OPTION_NOALLOC;
566
567 dpl_range.start = 0;
568 dpl_range.end = sysmd->size;
569 *request->rbuflen = sysmd->size;
570 dpl_sysmd_free(sysmd);
571 sysmd = dpl_sysmd_dup(&sysmd_);
572 status = dpl_fget(ctx_, /* context */
573 chunk_name.c_str(), /* locator */
574 &dpl_options, /* options */
575 NULL, /* condition */
576 &dpl_range, /* range */
577 (char **)&request->buffer, /* data_bufp */
578 request->rbuflen, /* data_lenp */
579 NULL, /* metadatap */
580 sysmd); /* sysmdp */
581
582 switch (status) {
583 case DPL_SUCCESS:
584 break;
585 case DPL_ENOENT:
586 Mmsg1(errmsg, _("Failed to open %s doesn't exist\n"), chunk_name.c_str());
587 Dmsg1(100, "%s", errmsg);
588 dev_errno = EIO;
589 goto bail_out;
590 default:
591 Mmsg2(errmsg, _("Failed to read %s using dpl_fget(): ERR=%s.\n"),
592 chunk_name.c_str(), dpl_status_str(status));
593 dev_errno = DropletErrnoToSystemErrno(status);
594 goto bail_out;
595 }
596
597 retval = true;
598
599 bail_out:
600 if (sysmd) {
601 dpl_sysmd_free(sysmd);
602 }
603
604 return retval;
605 }
606
607 /*
608 * Internal method for truncating a chunked volume on the remote backing store.
609 */
TruncateRemoteChunkedVolume(DeviceControlRecord * dcr)610 bool droplet_device::TruncateRemoteChunkedVolume(DeviceControlRecord *dcr)
611 {
612 PoolMem chunk_dir(PM_FNAME);
613
614 Dmsg1(100, "truncate_remote_chunked_volume(%s) start.\n", getVolCatName());
615 Mmsg(chunk_dir, "/%s", getVolCatName());
616 bool ignore_gaps = true;
617 if (!walk_chunks(chunk_dir.c_str(), chunked_volume_truncate_callback, NULL, ignore_gaps)) {
618 /* errno already set in walk_chunks. */
619 return false;
620 }
621 Dmsg1(100, "truncate_remote_chunked_volume(%s) finished.\n", getVolCatName());
622
623 return true;
624 }
625
626
d_flush(DeviceControlRecord * dcr)627 bool droplet_device::d_flush(DeviceControlRecord *dcr)
628 {
629 return WaitUntilChunksWritten();
630 };
631
632 /*
633 * Initialize backend.
634 */
initialize()635 bool droplet_device::initialize()
636 {
637 dpl_status_t status;
638
639 /*
640 * Initialize the droplet library when its not done previously.
641 */
642 P(mutex);
643 if (droplet_reference_count == 0) {
644 dpl_set_log_func(DropletDeviceLogfunc);
645
646 status = dpl_init();
647 switch (status) {
648 case DPL_SUCCESS:
649 break;
650 default:
651 V(mutex);
652 goto bail_out;
653 }
654 }
655 droplet_reference_count++;
656 V(mutex);
657
658 if (!configstring_) {
659 int len;
660 bool done;
661 uint64_t value;
662 char *bp, *next_option;
663
664 if (!dev_options) {
665 Mmsg0(errmsg, _("No device options configured\n"));
666 Emsg0(M_FATAL, 0, errmsg);
667 return -1;
668 }
669
670 configstring_ = bstrdup(dev_options);
671
672 bp = configstring_;
673 while (bp) {
674 next_option = strchr(bp, ',');
675 if (next_option) {
676 *next_option++ = '\0';
677 }
678
679 done = false;
680 for (int i = 0; !done && device_options[i].name; i++) {
681 /*
682 * Try to find a matching device option.
683 */
684 if (bstrncasecmp(bp, device_options[i].name, device_options[i].compare_size)) {
685 switch (device_options[i].type) {
686 case argument_profile: {
687 char *profile;
688
689 /*
690 * Strip any .profile prefix from the libdroplet profile name.
691 */
692 profile = bp + device_options[i].compare_size;
693 len = strlen(profile);
694 if (len > 8 && Bstrcasecmp(profile + (len - 8), ".profile")) {
695 profile[len - 8] = '\0';
696 }
697 profile_ = profile;
698 done = true;
699 break;
700 }
701 case argument_location:
702 location_ = bp + device_options[i].compare_size;
703 done = true;
704 break;
705 case argument_canned_acl:
706 canned_acl_ = bp + device_options[i].compare_size;
707 done = true;
708 break;
709 case argument_storage_class:
710 storage_class_ = bp + device_options[i].compare_size;
711 done = true;
712 break;
713 case argument_bucket:
714 bucketname_ = bp + device_options[i].compare_size;
715 done = true;
716 break;
717 case argument_chunksize:
718 size_to_uint64(bp + device_options[i].compare_size, &value);
719 chunk_size_ = value;
720 done = true;
721 break;
722 case argument_iothreads:
723 size_to_uint64(bp + device_options[i].compare_size, &value);
724 io_threads_ = value & 0xFF;
725 done = true;
726 break;
727 case argument_ioslots:
728 size_to_uint64(bp + device_options[i].compare_size, &value);
729 io_slots_ = value & 0xFF;
730 done = true;
731 break;
732 case argument_retries:
733 size_to_uint64(bp + device_options[i].compare_size, &value);
734 retries_ = value & 0xFF;
735 done = true;
736 break;
737 case argument_mmap:
738 use_mmap_ = true;
739 done = true;
740 break;
741 default:
742 break;
743 }
744 }
745 }
746
747 if (!done) {
748 Mmsg1(errmsg, _("Unable to parse device option: %s\n"), bp);
749 Emsg0(M_FATAL, 0, errmsg);
750 goto bail_out;
751 }
752
753 bp = next_option;
754 }
755
756 if (!profile_) {
757 Mmsg0(errmsg, _("No droplet profile configured\n"));
758 Emsg0(M_FATAL, 0, errmsg);
759 goto bail_out;
760 }
761 }
762
763 /*
764 * See if we need to setup a new context for this device.
765 */
766 if (!ctx_) {
767 char *bp;
768 PoolMem temp(PM_NAME);
769
770 /*
771 * Setup global sysmd settings which are cloned for each operation.
772 */
773 memset(&sysmd_, 0, sizeof(sysmd_));
774 if (location_) {
775 PmStrcpy(temp, location_);
776 sysmd_.mask |= DPL_SYSMD_MASK_LOCATION_CONSTRAINT;
777 sysmd_.location_constraint = dpl_location_constraint(temp.c_str());
778 if (sysmd_.location_constraint == -1) {
779 Mmsg2(errmsg, _("Illegal location argument %s for device %s%s\n"), temp.c_str(), dev_name);
780 goto bail_out;
781 }
782 }
783
784 if (canned_acl_) {
785 PmStrcpy(temp, canned_acl_);
786 sysmd_.mask |= DPL_SYSMD_MASK_CANNED_ACL;
787 sysmd_.canned_acl = dpl_canned_acl(temp.c_str());
788 if (sysmd_.canned_acl == -1) {
789 Mmsg2(errmsg, _("Illegal canned_acl argument %s for device %s%s\n"), temp.c_str(), dev_name);
790 goto bail_out;
791 }
792 }
793
794 if (storage_class_) {
795 PmStrcpy(temp, storage_class_);
796 sysmd_.mask |= DPL_SYSMD_MASK_STORAGE_CLASS;
797 sysmd_.storage_class = dpl_storage_class(temp.c_str());
798 if (sysmd_.storage_class == -1) {
799 Mmsg2(errmsg, _("Illegal storage_class argument %s for device %s%s\n"), temp.c_str(), dev_name);
800 goto bail_out;
801 }
802 }
803
804 /*
805 * See if this is a path.
806 */
807 PmStrcpy(temp, profile_);
808 bp = strrchr(temp.c_str(), '/');
809 if (!bp) {
810 /*
811 * Only a profile name.
812 */
813 ctx_ = dpl_ctx_new(NULL, temp.c_str());
814 } else {
815 if (bp == temp.c_str()) {
816 /*
817 * Profile in root of filesystem
818 */
819 ctx_ = dpl_ctx_new("/", bp + 1);
820 } else {
821 /*
822 * Profile somewhere else.
823 */
824 *bp++ = '\0';
825 ctx_ = dpl_ctx_new(temp.c_str(), bp);
826 }
827 }
828
829 /*
830 * If we failed to allocate a new context fail the open.
831 */
832 if (!ctx_) {
833 Mmsg1(errmsg, _("Failed to create a new context using config %s\n"), dev_options);
834 Dmsg1(100, "%s", errmsg);
835 goto bail_out;
836 }
837
838 /*
839 * Login if that is needed for this backend.
840 */
841 status = dpl_login(ctx_);
842
843 switch (status) {
844 case DPL_SUCCESS:
845 break;
846 case DPL_ENOTSUPP:
847 /*
848 * Backend doesn't support login which is fine.
849 */
850 break;
851 default:
852 Mmsg2(errmsg, _("Failed to login for volume %s using dpl_login(): ERR=%s.\n"),
853 getVolCatName(), dpl_status_str(status));
854 Dmsg1(100, "%s", errmsg);
855 goto bail_out;
856 }
857
858 /*
859 * If a bucketname was defined set it in the context.
860 */
861 if (bucketname_) {
862 ctx_->cur_bucket = bstrdup(bucketname_);
863 }
864 }
865
866 return true;
867
868 bail_out:
869 return false;
870 }
871
872 /*
873 * Open a volume using libdroplet.
874 */
d_open(const char * pathname,int flags,int mode)875 int droplet_device::d_open(const char *pathname, int flags, int mode)
876 {
877 if (!initialize()) {
878 return -1;
879 }
880
881 return SetupChunk(pathname, flags, mode);
882 }
883
884 /*
885 * Read data from a volume using libdroplet.
886 */
d_read(int fd,void * buffer,size_t count)887 ssize_t droplet_device::d_read(int fd, void *buffer, size_t count)
888 {
889 return ReadChunked(fd, buffer, count);
890 }
891
892 /**
893 * Write data to a volume using libdroplet.
894 */
d_write(int fd,const void * buffer,size_t count)895 ssize_t droplet_device::d_write(int fd, const void *buffer, size_t count)
896 {
897 return WriteChunked(fd, buffer, count);
898 }
899
d_close(int fd)900 int droplet_device::d_close(int fd)
901 {
902 return CloseChunk();
903 }
904
d_ioctl(int fd,ioctl_req_t request,char * op)905 int droplet_device::d_ioctl(int fd, ioctl_req_t request, char *op)
906 {
907 return -1;
908 }
909
910 /**
911 * Open a directory on the backing store and find out size information for a volume.
912 */
chunked_remote_volume_size()913 ssize_t droplet_device::chunked_remote_volume_size()
914 {
915 ssize_t volumesize = 0;
916 dpl_sysmd_t *sysmd = NULL;
917 PoolMem chunk_dir(PM_FNAME);
918
919 Mmsg(chunk_dir, "/%s", getVolCatName());
920
921 /*
922 * FIXME: With the current version of libdroplet a dpl_getattr() on a directory
923 * fails with DPL_ENOENT even when the directory does exist. All other
924 * operations succeed and as walk_chunks() does a dpl_chdir() anyway
925 * that will fail if the directory doesn't exist for now we should be
926 * mostly fine.
927 */
928
929 #if 0
930 /*
931 * First make sure that the chunkdir exists otherwise it makes little sense to scan it.
932 */
933 sysmd = dpl_sysmd_dup(&sysmd_);
934 status = dpl_getattr(ctx_, /* context */
935 chunk_dir.c_str(), /* locator */
936 NULL, /* metadata */
937 sysmd); /* sysmd */
938
939 switch (status) {
940 case DPL_SUCCESS:
941 /*
942 * Make sure the filetype is a directory and not a file.
943 */
944 if (sysmd->ftype != DPL_FTYPE_DIR) {
945 volumesize = -1;
946 goto bail_out;
947 }
948 break;
949 case DPL_ENOENT:
950 volumesize = -1;
951 goto bail_out;
952 default:
953 break;
954 }
955 #endif
956
957 Dmsg1(100, "get chunked_remote_volume_size(%s)\n", getVolCatName());
958 if (!walk_chunks(chunk_dir.c_str(), chunked_volume_size_callback, &volumesize)) {
959 /* errno is already set in walk_chunks */
960 volumesize = -1;
961 goto bail_out;
962 }
963
964 bail_out:
965 if (sysmd) {
966 dpl_sysmd_free(sysmd);
967 }
968
969 Dmsg2(100, "Size of volume %s: %lld\n", chunk_dir.c_str(), volumesize);
970
971 return volumesize;
972 }
973
d_lseek(DeviceControlRecord * dcr,boffset_t offset,int whence)974 boffset_t droplet_device::d_lseek(DeviceControlRecord *dcr, boffset_t offset, int whence)
975 {
976 switch (whence) {
977 case SEEK_SET:
978 offset_ = offset;
979 break;
980 case SEEK_CUR:
981 offset_ += offset;
982 break;
983 case SEEK_END: {
984 ssize_t volumesize;
985
986 volumesize = ChunkedVolumeSize();
987
988 Dmsg1(100, "Current volumesize: %lld\n", volumesize);
989
990 if (volumesize >= 0) {
991 offset_ = volumesize + offset;
992 } else {
993 return -1;
994 }
995 break;
996 }
997 default:
998 return -1;
999 }
1000
1001 if (!LoadChunk()) {
1002 return -1;
1003 }
1004
1005 return offset_;
1006 }
1007
d_truncate(DeviceControlRecord * dcr)1008 bool droplet_device::d_truncate(DeviceControlRecord *dcr)
1009 {
1010 return TruncateChunkedVolume(dcr);
1011 }
1012
~droplet_device()1013 droplet_device::~droplet_device()
1014 {
1015 if (ctx_) {
1016 if (bucketname_ && ctx_->cur_bucket) {
1017 free(ctx_->cur_bucket);
1018 ctx_->cur_bucket = NULL;
1019 }
1020 dpl_ctx_free(ctx_);
1021 ctx_ = NULL;
1022 }
1023
1024 if (configstring_) {
1025 free(configstring_);
1026 }
1027
1028 P(mutex);
1029 droplet_reference_count--;
1030 if (droplet_reference_count == 0) {
1031 dpl_free();
1032 }
1033 V(mutex);
1034 }
1035
droplet_device()1036 droplet_device::droplet_device()
1037 {
1038 configstring_ = NULL;
1039 bucketname_ = NULL;
1040 location_ = NULL;
1041 canned_acl_ = NULL;
1042 storage_class_ = NULL;
1043 ctx_ = NULL;
1044 }
1045
1046 #ifdef HAVE_DYNAMIC_SD_BACKENDS
backend_instantiate(JobControlRecord * jcr,int device_type)1047 extern "C" Device *backend_instantiate(JobControlRecord *jcr, int device_type)
1048 {
1049 Device *dev = NULL;
1050
1051 switch (device_type) {
1052 case B_DROPLET_DEV:
1053 dev = New(droplet_device);
1054 break;
1055 default:
1056 Jmsg(jcr, M_FATAL, 0, _("Request for unknown devicetype: %d\n"), device_type);
1057 break;
1058 }
1059
1060 return dev;
1061 }
1062
flush_backend(void)1063 extern "C" void flush_backend(void)
1064 {
1065 }
1066 #endif
1067 } /* namespace storagedaemon */
1068 #endif /* HAVE_DROPLET */
1069