1 /*
2    BAREOS® - Backup Archiving REcovery Open Sourced
3 
4    Copyright (C) 2014-2017 Planets Communications B.V.
5    Copyright (C) 2014-2018 Bareos GmbH & Co. KG
6 
7    This program is Free Software; you can redistribute it and/or
8    modify it under the terms of version three of the GNU Affero General Public
9    License as published by the Free Software Foundation and included
10    in the file LICENSE.
11 
12    This program is distributed in the hope that it will be useful, but
13    WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15    General Public License for more details.
16 
17    You should have received a copy of the GNU Affero General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20    02110-1301, USA.
21 */
22 /*
23  * Marco van Wieringen, February 2014
24  *
25  * Object Storage API device abstraction.
26  *
27  * Stacking is the following:
28  *
29  *   droplet_device::
30  *         |
31  *         v
32  *   chunked_device::
33  *         |
34  *         v
35  *       Device::
36  *
37  */
38 /**
39  * @file
40  * Object Storage API device abstraction.
41  */
42 
43 #include "include/bareos.h"
44 
45 #ifdef HAVE_DROPLET
46 #include "stored/stored.h"
47 #include "chunked_device.h"
48 #include "droplet_device.h"
49 #include "lib/edit.h"
50 
51 namespace storagedaemon {
52 
53 /**
54  * Options that can be specified for this device type.
55  */
56 enum device_option_type {
57    argument_none = 0,
58    argument_profile,
59    argument_location,
60    argument_canned_acl,
61    argument_storage_class,
62    argument_bucket,
63    argument_chunksize,
64    argument_iothreads,
65    argument_ioslots,
66    argument_retries,
67    argument_mmap
68 };
69 
70 struct device_option {
71    const char *name;
72    enum device_option_type type;
73    int compare_size;
74 };
75 
76 static device_option device_options[] = {
77    { "profile=", argument_profile, 8 },
78    { "location=", argument_location, 9 },
79    { "acl=", argument_canned_acl, 4 },
80    { "storageclass=", argument_storage_class, 13 },
81    { "bucket=", argument_bucket, 7 },
82    { "chunksize=", argument_chunksize, 10 },
83    { "iothreads=", argument_iothreads, 10 },
84    { "ioslots=", argument_ioslots, 8 },
85    { "retries=", argument_retries, 8 },
86    { "mmap", argument_mmap, 4 },
87    { NULL, argument_none }
88 };
89 
90 static int droplet_reference_count = 0;
91 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
92 
93 /**
94  * Generic log function that glues libdroplet with BAREOS.
95  */
DropletDeviceLogfunc(dpl_ctx_t * ctx,dpl_log_level_t level,const char * message)96 static void DropletDeviceLogfunc(dpl_ctx_t *ctx, dpl_log_level_t level, const char *message)
97 {
98    switch (level) {
99    case DPL_DEBUG:
100       Dmsg1(100, "%s\n", message);
101       break;
102    case DPL_INFO:
103       Emsg1(M_INFO, 0, "%s\n", message);
104       break;
105    case DPL_WARNING:
106       Emsg1(M_WARNING, 0, "%s\n", message);
107       break;
108    case DPL_ERROR:
109       Emsg1(M_ERROR, 0, "%s\n", message);
110       break;
111    default:
112       break;
113    }
114 }
115 
116 /**
117  * Map the droplet errno's to system ones.
118  */
DropletErrnoToSystemErrno(dpl_status_t status)119 static inline int DropletErrnoToSystemErrno(dpl_status_t status)
120 {
121    switch (status) {
122    case DPL_ENOENT:
123       errno = ENOENT;
124       break;
125    case DPL_ETIMEOUT:
126       errno = ETIMEDOUT;
127    case DPL_ENOMEM:
128       errno = ENOMEM;
129       break;
130    case DPL_EIO:
131       errno = EIO;
132       break;
133    case DPL_ENAMETOOLONG:
134       errno = ENAMETOOLONG;
135       break;
136    case DPL_ENOTDIR:
137       errno = ENOTDIR;
138       break;
139    case DPL_ENOTEMPTY:
140       errno = ENOTEMPTY;
141       break;
142    case DPL_EISDIR:
143       errno = EISDIR;
144       break;
145    case DPL_EEXIST:
146       errno = EEXIST;
147       break;
148    case DPL_EPERM:
149       errno = EPERM;
150       break;
151    case DPL_FAILURE: /**< General failure */
152       errno = EIO;
153       break;
154    default:
155       errno = EINVAL;
156       break;
157    }
158 
159    return errno;
160 }
161 
162 
163 
164 /*
165  * Callback for getting the total size of a chunked volume.
166  */
chunked_volume_size_callback(dpl_sysmd_t * sysmd,dpl_ctx_t * ctx,const char * chunkpath,void * data)167 static dpl_status_t chunked_volume_size_callback(dpl_sysmd_t *sysmd, dpl_ctx_t *ctx, const char *chunkpath, void *data)
168 {
169    dpl_status_t status = DPL_SUCCESS;
170    ssize_t *volumesize = (ssize_t *)data;
171 
172    *volumesize = *volumesize + sysmd->size;
173 
174    return status;
175 }
176 
177 /*
178  * Callback for truncating a chunked volume.
179  *
180  * @return DPL_SUCCESS on success, on error: a dpl_status_t value that represents the error.
181  */
chunked_volume_truncate_callback(dpl_sysmd_t * sysmd,dpl_ctx_t * ctx,const char * chunkpath,void * data)182 static dpl_status_t chunked_volume_truncate_callback(dpl_sysmd_t *sysmd, dpl_ctx_t *ctx, const char *chunkpath, void *data)
183 {
184    dpl_status_t status = DPL_SUCCESS;
185 
186    status = dpl_unlink(ctx, chunkpath);
187 
188    switch (status) {
189       case DPL_SUCCESS:
190          break;
191       default:
192          /* no error message here, as error will be set by calling function. */
193          return status;
194    }
195 
196    return status;
197 }
198 
199 
200 /*
201  * Generic function that walks a dirname and calls the callback
202  * function for each entry it finds in that directory.
203  *
204  * @return: true - if no error occured
205  *          false - if an error has occured. Sets dev_errno and errmsg to the first error.
206  */
walk_chunks(const char * dirname,t_dpl_walk_chunks_call_back callback,void * data,bool ignore_gaps)207 bool droplet_device::walk_chunks(const char *dirname, t_dpl_walk_chunks_call_back callback, void *data, bool ignore_gaps)
208 {
209    bool retval = true;
210    dpl_status_t status;
211    dpl_status_t callback_status;
212    dpl_sysmd_t *sysmd = NULL;
213    PoolMem path(PM_NAME);
214 
215    sysmd = dpl_sysmd_dup(&sysmd_);
216    bool found = true;
217    int i = 0;
218    while ((i < max_chunks_) && (found) && (retval)) {
219       path.bsprintf("%s/%04d", dirname, i);
220 
221       status = dpl_getattr(ctx_, /* context */
222                            path.c_str(), /* locator */
223                            NULL, /* metadata */
224                            sysmd); /* sysmd */
225 
226       switch (status) {
227          case DPL_SUCCESS:
228             Dmsg1(100, "chunk %s exists. Calling callback.\n", path.c_str());
229             callback_status = callback(sysmd, ctx_, path.c_str(), data);
230             if (callback_status == DPL_SUCCESS) {
231                i++;
232             } else {
233                Mmsg2(errmsg, _("Operation failed on chunk %s: ERR=%s."),
234                      path.c_str(), dpl_status_str(callback_status));
235                dev_errno = DropletErrnoToSystemErrno(callback_status);
236                /* exit loop */
237                retval = false;
238             }
239             break;
240          case DPL_ENOENT:
241             if (ignore_gaps) {
242                Dmsg1(1000, "chunk %s does not exists. Skipped.\n", path.c_str());
243                i++;
244             } else {
245                Dmsg1(100, "chunk %s does not exists. Exiting.\n", path.c_str());
246                found = false;
247             }
248             break;
249          default:
250             Dmsg2(100, "chunk %s failure: %s. Exiting.\n", path.c_str(), dpl_status_str(callback_status));
251             found = false;
252             break;
253       }
254    }
255 
256    if (sysmd) {
257       dpl_sysmd_free(sysmd);
258       sysmd = NULL;
259    }
260 
261    return retval;
262 }
263 
264 /**
265  * Check if a specific path exists.
266  * It uses dpl_getattr() for this.
267  * However, dpl_getattr() results wrong results in a couple of situations,
268  * espescially directoy names should not be checked using a prepended "/".
269  *
270  * Results in detail:
271  *
272  * path      | "name"  | "name/" | target reachable | target not reachable  | target not reachable | wrong credentials
273  *           | exists  | exists  |                  | (already initialized) | (not initialized)    |
274  * -------------------------------------------------------------------------------------------------------------------
275  * ""        | -       | yes     | DPL_SUCCESS      | DPL_SUCCESS (!)       | DPL_FAILURE          | DPL_EPERM
276  * "/"       | yes     | -       | DPL_SUCCESS      | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
277  *
278  * "name"    | -       | -       | DPL_ENOENT       | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
279  * "/name"   | -       | -       | DPL_ENOENT       | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
280  * "name/"   | -       | -       | DPL_ENOENT       | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
281  * "/name/"  | -       | -       | DPL_SUCCESS (!)  | DPL_SUCCESS (!)       | DPL_SUCCESS (!)      | DPL_SUCCESS (!)
282  *
283  * "name"    | yes     | -       | DPL_SUCCESS      | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
284  * "/name"   | yes     | -       | DPL_SUCCESS      | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
285  * "name/"   | yes     | -       | DPL_ENOTDIR      | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
286  * "/name/"  | yes     | -       | DPL_SUCCESS (!)  | DPL_SUCCESS (!)       | DPL_SUCCESS (!)      | DPL_SUCCESS (!)
287  *
288  * "name"    | -       | yes     | DPL_SUCCESS      | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
289  * "/name"   | -       | yes     | DPL_ENOENT  (!)  | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
290  * "name/"   | -       | yes     | DPL_SUCCESS      | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
291  * "/name/"  | -       | yes     | DPL_SUCCESS      | DPL_SUCCESS (!)       | DPL_SUCCESS (!)      | DPL_SUCCESS (!)
292  *
293  * "name"    | yes     | yes     | DPL_SUCCESS      | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
294  * "/name"   | yes     | yes     | DPL_SUCCESS      | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
295  * "name/"   | yes     | yes     | DPL_SUCCESS      | DPL_FAILURE           | DPL_FAILURE          | DPL_EPERM
296  * "/name/"  | yes     | yes     | DPL_SUCCESS      | DPL_SUCCESS (!)       | DPL_SUCCESS (!)      | DPL_SUCCESS (!)
297  *
298  * Best test for
299  *   directories as "dir/"
300  *   files as "file" or "/file".
301  *
302  * Returns DPL_SUCCESS     - if path exists and can be accessed
303  *         DPL_* errorcode - otherwise
304  */
check_path(const char * path)305 dpl_status_t droplet_device::check_path(const char *path)
306 {
307    dpl_status_t status;
308    dpl_sysmd_t *sysmd = NULL;
309 
310    sysmd = dpl_sysmd_dup(&sysmd_);
311    status = dpl_getattr(ctx_, /* context */
312                         path, /* locator */
313                         NULL, /* metadata */
314                         sysmd); /* sysmd */
315    Dmsg4(100, "check_path(device=%s, bucket=%s, path=%s): %s\n", prt_name, ctx_->cur_bucket, path, dpl_status_str(status));
316    dpl_sysmd_free(sysmd);
317 
318    return status;
319 }
320 
321 
322 
323 /**
324  * Checks if the connection to the backend storage system is possible.
325  *
326  * Returns true  - if connection can be established
327  *         false - otherwise
328  */
CheckRemote()329 bool droplet_device::CheckRemote()
330 {
331    if (!ctx_) {
332       if (!initialize()) {
333          return false;
334       }
335    }
336 
337    if (check_path("/") != DPL_SUCCESS) {
338       Dmsg1(100, "CheckRemote(%s): failed\n", prt_name);
339       return false;
340    }
341 
342    Dmsg1(100, "CheckRemote(%s): ok\n", prt_name);
343 
344    return true;
345 }
346 
347 
348 
remote_chunked_volume_exists()349 bool droplet_device::remote_chunked_volume_exists()
350 {
351    bool retval = false;
352    dpl_status_t status;
353    PoolMem chunk_dir(PM_FNAME);
354 
355    if (!CheckRemote()) {
356       return false;
357    }
358 
359    Mmsg(chunk_dir, "%s/", getVolCatName());
360    status = check_path(chunk_dir.c_str());
361 
362    switch (status) {
363    case DPL_SUCCESS:
364       Dmsg1(100, "remote_chunked_volume %s exists\n", chunk_dir.c_str());
365       retval = true;
366       break;
367    case DPL_ENOENT:
368    case DPL_FAILURE:
369    default:
370       Dmsg1(100, "remote_chunked_volume %s does not exists\n", chunk_dir.c_str());
371       break;
372    }
373 
374    return retval;
375 }
376 
377 
378 /*
379  * Internal method for flushing a chunk to the backing store.
380  * This does the real work either by being called from a
381  * io-thread or directly blocking the device.
382  */
FlushRemoteChunk(chunk_io_request * request)383 bool droplet_device::FlushRemoteChunk(chunk_io_request *request)
384 {
385    bool retval = false;
386    dpl_status_t status;
387    dpl_option_t dpl_options;
388    dpl_sysmd_t *sysmd = NULL;
389    PoolMem chunk_dir(PM_FNAME),
390            chunk_name(PM_FNAME);
391 
392    Mmsg(chunk_dir, "/%s", request->volname);
393    Mmsg(chunk_name, "%s/%04d", chunk_dir.c_str(), request->chunk);
394 
395    /*
396     * Set that we are uploading the chunk.
397     */
398    if (!SetInflightChunk(request)) {
399       goto bail_out;
400    }
401 
402    Dmsg1(100, "Flushing chunk %s\n", chunk_name.c_str());
403 
404    /*
405     * Check on the remote backing store if the chunk already exists.
406     * We only upload this chunk if it is bigger then the chunk that exists
407     * on the remote backing store. When using io-threads it could happen
408     * that there are multiple flush requests for the same chunk when a
409     * chunk is reused in a next backup job. We only want the chunk with
410     * the biggest amount of valid data to persist as we only append to
411     * chunks.
412     */
413    sysmd = dpl_sysmd_dup(&sysmd_);
414    status = dpl_getattr(ctx_, /* context */
415                         chunk_name.c_str(), /* locator */
416                         NULL, /* metadata */
417                         sysmd); /* sysmd */
418 
419    switch (status) {
420    case DPL_SUCCESS:
421       if (sysmd->size > request->wbuflen) {
422          retval = true;
423          goto bail_out;
424       }
425       break;
426    default:
427       /*
428        * Check on the remote backing store if the chunkdir exists.
429        */
430       dpl_sysmd_free(sysmd);
431       sysmd = dpl_sysmd_dup(&sysmd_);
432       status = dpl_getattr(ctx_, /* context */
433                            chunk_dir.c_str(), /* locator */
434                            NULL, /* metadata */
435                            sysmd); /* sysmd */
436 
437       switch (status) {
438       case DPL_SUCCESS:
439          break;
440       case DPL_ENOENT:
441       case DPL_FAILURE:
442          /*
443           * Make sure the chunk directory with the name of the volume exists.
444           */
445          dpl_sysmd_free(sysmd);
446          sysmd = dpl_sysmd_dup(&sysmd_);
447          status = dpl_mkdir(ctx_, /* context */
448                             chunk_dir.c_str(), /* locator */
449                             NULL, /* metadata */
450                             sysmd);/* sysmd */
451 
452          switch (status) {
453          case DPL_SUCCESS:
454             break;
455          default:
456             Mmsg2(errmsg, _("Failed to create directory %s using dpl_mkdir(): ERR=%s.\n"),
457                   chunk_dir.c_str(), dpl_status_str(status));
458             dev_errno = DropletErrnoToSystemErrno(status);
459             goto bail_out;
460          }
461          break;
462       default:
463          break;
464       }
465       break;
466    }
467 
468    /*
469     * Create some options for libdroplet.
470     *
471     * DPL_OPTION_NOALLOC - we provide the buffer to copy the data into
472     *                      no need to let the library allocate memory we
473     *                      need to free after copying the data.
474     */
475    memset(&dpl_options, 0, sizeof(dpl_options));
476    dpl_options.mask |= DPL_OPTION_NOALLOC;
477 
478    dpl_sysmd_free(sysmd);
479    sysmd = dpl_sysmd_dup(&sysmd_);
480    status = dpl_fput(ctx_, /* context */
481                      chunk_name.c_str(), /* locator */
482                      &dpl_options, /* options */
483                      NULL, /* condition */
484                      NULL, /* range */
485                      NULL, /* metadata */
486                      sysmd, /* sysmd */
487                      (char *)request->buffer, /* data_buf */
488                      request->wbuflen); /* data_len */
489 
490    switch (status) {
491    case DPL_SUCCESS:
492       break;
493    default:
494       Mmsg2(errmsg, _("Failed to flush %s using dpl_fput(): ERR=%s.\n"),
495             chunk_name.c_str(), dpl_status_str(status));
496       dev_errno = DropletErrnoToSystemErrno(status);
497       goto bail_out;
498    }
499 
500    retval = true;
501 
502 bail_out:
503    /*
504     * Clear that we are uploading the chunk.
505     */
506    ClearInflightChunk(request);
507 
508    if (sysmd) {
509       dpl_sysmd_free(sysmd);
510    }
511 
512    return retval;
513 }
514 
515 /*
516  * Internal method for reading a chunk from the remote backing store.
517  */
ReadRemoteChunk(chunk_io_request * request)518 bool droplet_device::ReadRemoteChunk(chunk_io_request *request)
519 {
520    bool retval = false;
521    dpl_status_t status;
522    dpl_option_t dpl_options;
523    dpl_range_t dpl_range;
524    dpl_sysmd_t *sysmd = NULL;
525    PoolMem chunk_name(PM_FNAME);
526 
527    Mmsg(chunk_name, "/%s/%04d", request->volname, request->chunk);
528    Dmsg1(100, "Reading chunk %s\n", chunk_name.c_str());
529 
530    /*
531     * See if chunk exists.
532     */
533    sysmd = dpl_sysmd_dup(&sysmd_);
534    status = dpl_getattr(ctx_, /* context */
535                         chunk_name.c_str(), /* locator */
536                         NULL, /* metadata */
537                         sysmd); /* sysmd */
538 
539    switch (status) {
540    case DPL_SUCCESS:
541       break;
542    default:
543       Mmsg1(errmsg, _("Failed to open %s doesn't exist\n"), chunk_name.c_str());
544       Dmsg1(100, "%s", errmsg);
545       dev_errno = EIO;
546       goto bail_out;
547    }
548 
549    if (sysmd->size > request->wbuflen) {
550       Mmsg3(errmsg, _("Failed to read %s (%ld) to big to fit in chunksize of %ld bytes\n"),
551             chunk_name.c_str(), sysmd->size, request->wbuflen);
552       Dmsg1(100, "%s", errmsg);
553       dev_errno = EINVAL;
554       goto bail_out;
555    }
556 
557    /*
558     * Create some options for libdroplet.
559     *
560     * DPL_OPTION_NOALLOC - we provide the buffer to copy the data into
561     *                      no need to let the library allocate memory we
562     *                      need to free after copying the data.
563     */
564    memset(&dpl_options, 0, sizeof(dpl_options));
565    dpl_options.mask |= DPL_OPTION_NOALLOC;
566 
567    dpl_range.start = 0;
568    dpl_range.end = sysmd->size;
569    *request->rbuflen = sysmd->size;
570    dpl_sysmd_free(sysmd);
571    sysmd = dpl_sysmd_dup(&sysmd_);
572    status = dpl_fget(ctx_, /* context */
573                      chunk_name.c_str(), /* locator */
574                      &dpl_options, /* options */
575                      NULL, /* condition */
576                      &dpl_range, /* range */
577                      (char **)&request->buffer, /* data_bufp */
578                      request->rbuflen, /* data_lenp */
579                      NULL, /* metadatap */
580                      sysmd); /* sysmdp */
581 
582    switch (status) {
583    case DPL_SUCCESS:
584       break;
585    case DPL_ENOENT:
586       Mmsg1(errmsg, _("Failed to open %s doesn't exist\n"), chunk_name.c_str());
587       Dmsg1(100, "%s", errmsg);
588       dev_errno = EIO;
589       goto bail_out;
590    default:
591       Mmsg2(errmsg, _("Failed to read %s using dpl_fget(): ERR=%s.\n"),
592             chunk_name.c_str(), dpl_status_str(status));
593       dev_errno = DropletErrnoToSystemErrno(status);
594       goto bail_out;
595    }
596 
597    retval = true;
598 
599 bail_out:
600    if (sysmd) {
601       dpl_sysmd_free(sysmd);
602    }
603 
604    return retval;
605 }
606 
607 /*
608  * Internal method for truncating a chunked volume on the remote backing store.
609  */
TruncateRemoteChunkedVolume(DeviceControlRecord * dcr)610 bool droplet_device::TruncateRemoteChunkedVolume(DeviceControlRecord *dcr)
611 {
612    PoolMem chunk_dir(PM_FNAME);
613 
614    Dmsg1(100, "truncate_remote_chunked_volume(%s) start.\n", getVolCatName());
615    Mmsg(chunk_dir, "/%s", getVolCatName());
616    bool ignore_gaps = true;
617    if (!walk_chunks(chunk_dir.c_str(), chunked_volume_truncate_callback, NULL, ignore_gaps)) {
618       /* errno already set in walk_chunks. */
619       return false;
620    }
621    Dmsg1(100, "truncate_remote_chunked_volume(%s) finished.\n", getVolCatName());
622 
623    return true;
624 }
625 
626 
d_flush(DeviceControlRecord * dcr)627 bool droplet_device::d_flush(DeviceControlRecord *dcr)
628 {
629    return WaitUntilChunksWritten();
630 };
631 
632 /*
633  * Initialize backend.
634  */
initialize()635 bool droplet_device::initialize()
636 {
637    dpl_status_t status;
638 
639    /*
640     * Initialize the droplet library when its not done previously.
641     */
642    P(mutex);
643    if (droplet_reference_count == 0) {
644       dpl_set_log_func(DropletDeviceLogfunc);
645 
646       status = dpl_init();
647       switch (status) {
648       case DPL_SUCCESS:
649          break;
650       default:
651          V(mutex);
652          goto bail_out;
653       }
654    }
655    droplet_reference_count++;
656    V(mutex);
657 
658    if (!configstring_) {
659       int len;
660       bool done;
661       uint64_t value;
662       char *bp, *next_option;
663 
664       if (!dev_options) {
665          Mmsg0(errmsg, _("No device options configured\n"));
666          Emsg0(M_FATAL, 0, errmsg);
667          return -1;
668       }
669 
670       configstring_ = bstrdup(dev_options);
671 
672       bp = configstring_;
673       while (bp) {
674          next_option = strchr(bp, ',');
675          if (next_option) {
676             *next_option++ = '\0';
677          }
678 
679          done = false;
680          for (int i = 0; !done && device_options[i].name; i++) {
681             /*
682              * Try to find a matching device option.
683              */
684             if (bstrncasecmp(bp, device_options[i].name, device_options[i].compare_size)) {
685                switch (device_options[i].type) {
686                case argument_profile: {
687                   char *profile;
688 
689                   /*
690                    * Strip any .profile prefix from the libdroplet profile name.
691                    */
692                   profile = bp + device_options[i].compare_size;
693                   len = strlen(profile);
694                   if (len > 8 && Bstrcasecmp(profile + (len - 8), ".profile")) {
695                      profile[len - 8] = '\0';
696                   }
697                   profile_ = profile;
698                   done = true;
699                   break;
700                }
701                case argument_location:
702                   location_ = bp + device_options[i].compare_size;
703                   done = true;
704                   break;
705                case argument_canned_acl:
706                   canned_acl_ = bp + device_options[i].compare_size;
707                   done = true;
708                   break;
709                case argument_storage_class:
710                   storage_class_ = bp + device_options[i].compare_size;
711                   done = true;
712                   break;
713                case argument_bucket:
714                   bucketname_ = bp + device_options[i].compare_size;
715                   done = true;
716                   break;
717                case argument_chunksize:
718                   size_to_uint64(bp + device_options[i].compare_size, &value);
719                   chunk_size_ = value;
720                   done = true;
721                   break;
722                case argument_iothreads:
723                   size_to_uint64(bp + device_options[i].compare_size, &value);
724                   io_threads_ = value & 0xFF;
725                   done = true;
726                   break;
727                case argument_ioslots:
728                   size_to_uint64(bp + device_options[i].compare_size, &value);
729                   io_slots_ = value & 0xFF;
730                   done = true;
731                   break;
732                case argument_retries:
733                   size_to_uint64(bp + device_options[i].compare_size, &value);
734                   retries_ = value & 0xFF;
735                   done = true;
736                   break;
737                case argument_mmap:
738                   use_mmap_ = true;
739                   done = true;
740                   break;
741                default:
742                   break;
743                }
744             }
745          }
746 
747          if (!done) {
748             Mmsg1(errmsg, _("Unable to parse device option: %s\n"), bp);
749             Emsg0(M_FATAL, 0, errmsg);
750             goto bail_out;
751          }
752 
753          bp = next_option;
754       }
755 
756       if (!profile_) {
757          Mmsg0(errmsg, _("No droplet profile configured\n"));
758          Emsg0(M_FATAL, 0, errmsg);
759          goto bail_out;
760       }
761    }
762 
763    /*
764     * See if we need to setup a new context for this device.
765     */
766    if (!ctx_) {
767       char *bp;
768       PoolMem temp(PM_NAME);
769 
770       /*
771        * Setup global sysmd settings which are cloned for each operation.
772        */
773       memset(&sysmd_, 0, sizeof(sysmd_));
774       if (location_) {
775          PmStrcpy(temp, location_);
776          sysmd_.mask |= DPL_SYSMD_MASK_LOCATION_CONSTRAINT;
777          sysmd_.location_constraint = dpl_location_constraint(temp.c_str());
778          if (sysmd_.location_constraint == -1) {
779             Mmsg2(errmsg, _("Illegal location argument %s for device %s%s\n"), temp.c_str(), dev_name);
780             goto bail_out;
781          }
782       }
783 
784       if (canned_acl_) {
785          PmStrcpy(temp, canned_acl_);
786          sysmd_.mask |= DPL_SYSMD_MASK_CANNED_ACL;
787          sysmd_.canned_acl = dpl_canned_acl(temp.c_str());
788          if (sysmd_.canned_acl == -1) {
789             Mmsg2(errmsg, _("Illegal canned_acl argument %s for device %s%s\n"), temp.c_str(), dev_name);
790             goto bail_out;
791          }
792       }
793 
794       if (storage_class_) {
795          PmStrcpy(temp, storage_class_);
796          sysmd_.mask |= DPL_SYSMD_MASK_STORAGE_CLASS;
797          sysmd_.storage_class = dpl_storage_class(temp.c_str());
798          if (sysmd_.storage_class == -1) {
799             Mmsg2(errmsg, _("Illegal storage_class argument %s for device %s%s\n"), temp.c_str(), dev_name);
800             goto bail_out;
801          }
802       }
803 
804       /*
805        * See if this is a path.
806        */
807       PmStrcpy(temp, profile_);
808       bp = strrchr(temp.c_str(), '/');
809       if (!bp) {
810          /*
811           * Only a profile name.
812           */
813          ctx_ = dpl_ctx_new(NULL, temp.c_str());
814       } else {
815          if (bp == temp.c_str()) {
816             /*
817              * Profile in root of filesystem
818              */
819             ctx_ = dpl_ctx_new("/", bp + 1);
820          } else {
821             /*
822              * Profile somewhere else.
823              */
824             *bp++ = '\0';
825             ctx_ = dpl_ctx_new(temp.c_str(), bp);
826          }
827       }
828 
829       /*
830        * If we failed to allocate a new context fail the open.
831        */
832       if (!ctx_) {
833          Mmsg1(errmsg, _("Failed to create a new context using config %s\n"), dev_options);
834          Dmsg1(100, "%s", errmsg);
835          goto bail_out;
836       }
837 
838       /*
839        * Login if that is needed for this backend.
840        */
841       status = dpl_login(ctx_);
842 
843       switch (status) {
844       case DPL_SUCCESS:
845          break;
846       case DPL_ENOTSUPP:
847          /*
848           * Backend doesn't support login which is fine.
849           */
850          break;
851       default:
852          Mmsg2(errmsg, _("Failed to login for volume %s using dpl_login(): ERR=%s.\n"),
853                getVolCatName(), dpl_status_str(status));
854          Dmsg1(100, "%s", errmsg);
855          goto bail_out;
856       }
857 
858       /*
859        * If a bucketname was defined set it in the context.
860        */
861       if (bucketname_) {
862          ctx_->cur_bucket = bstrdup(bucketname_);
863       }
864    }
865 
866    return true;
867 
868 bail_out:
869    return false;
870 }
871 
872 /*
873  * Open a volume using libdroplet.
874  */
d_open(const char * pathname,int flags,int mode)875 int droplet_device::d_open(const char *pathname, int flags, int mode)
876 {
877    if (!initialize()) {
878       return -1;
879    }
880 
881    return SetupChunk(pathname, flags, mode);
882 }
883 
884 /*
885  * Read data from a volume using libdroplet.
886  */
d_read(int fd,void * buffer,size_t count)887 ssize_t droplet_device::d_read(int fd, void *buffer, size_t count)
888 {
889    return ReadChunked(fd, buffer, count);
890 }
891 
892 /**
893  * Write data to a volume using libdroplet.
894  */
d_write(int fd,const void * buffer,size_t count)895 ssize_t droplet_device::d_write(int fd, const void *buffer, size_t count)
896 {
897    return WriteChunked(fd, buffer, count);
898 }
899 
d_close(int fd)900 int droplet_device::d_close(int fd)
901 {
902    return CloseChunk();
903 }
904 
d_ioctl(int fd,ioctl_req_t request,char * op)905 int droplet_device::d_ioctl(int fd, ioctl_req_t request, char *op)
906 {
907    return -1;
908 }
909 
910 /**
911  * Open a directory on the backing store and find out size information for a volume.
912  */
chunked_remote_volume_size()913 ssize_t droplet_device::chunked_remote_volume_size()
914 {
915    ssize_t volumesize = 0;
916    dpl_sysmd_t *sysmd = NULL;
917    PoolMem chunk_dir(PM_FNAME);
918 
919    Mmsg(chunk_dir, "/%s", getVolCatName());
920 
921    /*
922     * FIXME: With the current version of libdroplet a dpl_getattr() on a directory
923     *        fails with DPL_ENOENT even when the directory does exist. All other
924     *        operations succeed and as walk_chunks() does a dpl_chdir() anyway
925     *        that will fail if the directory doesn't exist for now we should be
926     *        mostly fine.
927     */
928 
929 #if 0
930    /*
931     * First make sure that the chunkdir exists otherwise it makes little sense to scan it.
932     */
933    sysmd = dpl_sysmd_dup(&sysmd_);
934    status = dpl_getattr(ctx_, /* context */
935                         chunk_dir.c_str(), /* locator */
936                         NULL, /* metadata */
937                         sysmd); /* sysmd */
938 
939    switch (status) {
940    case DPL_SUCCESS:
941       /*
942        * Make sure the filetype is a directory and not a file.
943        */
944       if (sysmd->ftype != DPL_FTYPE_DIR) {
945          volumesize = -1;
946          goto bail_out;
947       }
948       break;
949    case DPL_ENOENT:
950       volumesize = -1;
951       goto bail_out;
952    default:
953       break;
954    }
955 #endif
956 
957    Dmsg1(100, "get chunked_remote_volume_size(%s)\n", getVolCatName());
958    if (!walk_chunks(chunk_dir.c_str(), chunked_volume_size_callback, &volumesize)) {
959       /* errno is already set in walk_chunks */
960       volumesize = -1;
961       goto bail_out;
962    }
963 
964 bail_out:
965    if (sysmd) {
966       dpl_sysmd_free(sysmd);
967    }
968 
969    Dmsg2(100, "Size of volume %s: %lld\n", chunk_dir.c_str(), volumesize);
970 
971    return volumesize;
972 }
973 
d_lseek(DeviceControlRecord * dcr,boffset_t offset,int whence)974 boffset_t droplet_device::d_lseek(DeviceControlRecord *dcr, boffset_t offset, int whence)
975 {
976    switch (whence) {
977    case SEEK_SET:
978       offset_ = offset;
979       break;
980    case SEEK_CUR:
981       offset_ += offset;
982       break;
983    case SEEK_END: {
984       ssize_t volumesize;
985 
986       volumesize = ChunkedVolumeSize();
987 
988       Dmsg1(100, "Current volumesize: %lld\n", volumesize);
989 
990       if (volumesize >= 0) {
991          offset_ = volumesize + offset;
992       } else {
993          return -1;
994       }
995       break;
996    }
997    default:
998       return -1;
999    }
1000 
1001    if (!LoadChunk()) {
1002       return -1;
1003    }
1004 
1005    return offset_;
1006 }
1007 
d_truncate(DeviceControlRecord * dcr)1008 bool droplet_device::d_truncate(DeviceControlRecord *dcr)
1009 {
1010    return TruncateChunkedVolume(dcr);
1011 }
1012 
~droplet_device()1013 droplet_device::~droplet_device()
1014 {
1015    if (ctx_) {
1016       if (bucketname_ && ctx_->cur_bucket) {
1017          free(ctx_->cur_bucket);
1018          ctx_->cur_bucket = NULL;
1019       }
1020       dpl_ctx_free(ctx_);
1021       ctx_ = NULL;
1022    }
1023 
1024    if (configstring_) {
1025       free(configstring_);
1026    }
1027 
1028    P(mutex);
1029    droplet_reference_count--;
1030    if (droplet_reference_count == 0) {
1031       dpl_free();
1032    }
1033    V(mutex);
1034 }
1035 
droplet_device()1036 droplet_device::droplet_device()
1037 {
1038    configstring_ = NULL;
1039    bucketname_ = NULL;
1040    location_ = NULL;
1041    canned_acl_ = NULL;
1042    storage_class_ = NULL;
1043    ctx_ = NULL;
1044 }
1045 
1046 #ifdef HAVE_DYNAMIC_SD_BACKENDS
backend_instantiate(JobControlRecord * jcr,int device_type)1047 extern "C" Device *backend_instantiate(JobControlRecord *jcr, int device_type)
1048 {
1049    Device *dev = NULL;
1050 
1051    switch (device_type) {
1052    case B_DROPLET_DEV:
1053       dev = New(droplet_device);
1054       break;
1055    default:
1056       Jmsg(jcr, M_FATAL, 0, _("Request for unknown devicetype: %d\n"), device_type);
1057       break;
1058    }
1059 
1060    return dev;
1061 }
1062 
flush_backend(void)1063 extern "C" void flush_backend(void)
1064 {
1065 }
1066 #endif
1067 } /* namespace storagedaemon */
1068 #endif /* HAVE_DROPLET */
1069