xref: /qemu/block/rbd.c (revision 47e6b297)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "block/block_int.h"
20 #include "crypto/secret.h"
21 #include "qemu/cutils.h"
22 #include "qapi/qmp/qstring.h"
23 #include "qapi/qmp/qjson.h"
24 #include "qapi/qmp/qlist.h"
25 
26 /*
27  * When specifying the image filename use:
28  *
29  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
30  *
31  * poolname must be the name of an existing rados pool.
32  *
33  * devicename is the name of the rbd image.
34  *
35  * Each option given is used to configure rados, and may be any valid
36  * Ceph option, "id", or "conf".
37  *
38  * The "id" option indicates what user we should authenticate as to
39  * the Ceph cluster.  If it is excluded we will use the Ceph default
40  * (normally 'admin').
41  *
42  * The "conf" option specifies a Ceph configuration file to read.  If
43  * it is not specified, we will read from the default Ceph locations
44  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
45  * file, specify conf=/dev/null.
46  *
47  * Configuration values containing :, @, or = can be escaped with a
48  * leading "\".
49  */
50 
51 /* rbd_aio_discard added in 0.1.2 */
52 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
53 #define LIBRBD_SUPPORTS_DISCARD
54 #else
55 #undef LIBRBD_SUPPORTS_DISCARD
56 #endif
57 
58 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
59 
60 #define RBD_MAX_SNAPS 100
61 
62 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
63 #ifdef LIBRBD_SUPPORTS_IOVEC
64 #define LIBRBD_USE_IOVEC 1
65 #else
66 #define LIBRBD_USE_IOVEC 0
67 #endif
68 
69 typedef enum {
70     RBD_AIO_READ,
71     RBD_AIO_WRITE,
72     RBD_AIO_DISCARD,
73     RBD_AIO_FLUSH
74 } RBDAIOCmd;
75 
76 typedef struct RBDAIOCB {
77     BlockAIOCB common;
78     int64_t ret;
79     QEMUIOVector *qiov;
80     char *bounce;
81     RBDAIOCmd cmd;
82     int error;
83     struct BDRVRBDState *s;
84 } RBDAIOCB;
85 
86 typedef struct RADOSCB {
87     RBDAIOCB *acb;
88     struct BDRVRBDState *s;
89     int64_t size;
90     char *buf;
91     int64_t ret;
92 } RADOSCB;
93 
94 typedef struct BDRVRBDState {
95     rados_t cluster;
96     rados_ioctx_t io_ctx;
97     rbd_image_t image;
98     char *image_name;
99     char *snap;
100 } BDRVRBDState;
101 
102 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
103 {
104     char *end;
105 
106     *p = NULL;
107 
108     for (end = src; *end; ++end) {
109         if (*end == delim) {
110             break;
111         }
112         if (*end == '\\' && end[1] != '\0') {
113             end++;
114         }
115     }
116     if (*end == delim) {
117         *p = end + 1;
118         *end = '\0';
119     }
120     return src;
121 }
122 
123 static void qemu_rbd_unescape(char *src)
124 {
125     char *p;
126 
127     for (p = src; *src; ++src, ++p) {
128         if (*src == '\\' && src[1] != '\0') {
129             src++;
130         }
131         *p = *src;
132     }
133     *p = '\0';
134 }
135 
136 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
137                                     Error **errp)
138 {
139     const char *start;
140     char *p, *buf;
141     QList *keypairs = NULL;
142     char *found_str;
143 
144     if (!strstart(filename, "rbd:", &start)) {
145         error_setg(errp, "File name must start with 'rbd:'");
146         return;
147     }
148 
149     buf = g_strdup(start);
150     p = buf;
151 
152     found_str = qemu_rbd_next_tok(p, '/', &p);
153     if (!p) {
154         error_setg(errp, "Pool name is required");
155         goto done;
156     }
157     qemu_rbd_unescape(found_str);
158     qdict_put_str(options, "pool", found_str);
159 
160     if (strchr(p, '@')) {
161         found_str = qemu_rbd_next_tok(p, '@', &p);
162         qemu_rbd_unescape(found_str);
163         qdict_put_str(options, "image", found_str);
164 
165         found_str = qemu_rbd_next_tok(p, ':', &p);
166         qemu_rbd_unescape(found_str);
167         qdict_put_str(options, "snapshot", found_str);
168     } else {
169         found_str = qemu_rbd_next_tok(p, ':', &p);
170         qemu_rbd_unescape(found_str);
171         qdict_put_str(options, "image", found_str);
172     }
173     if (!p) {
174         goto done;
175     }
176 
177     /* The following are essentially all key/value pairs, and we treat
178      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
179     while (p) {
180         char *name, *value;
181         name = qemu_rbd_next_tok(p, '=', &p);
182         if (!p) {
183             error_setg(errp, "conf option %s has no value", name);
184             break;
185         }
186 
187         qemu_rbd_unescape(name);
188 
189         value = qemu_rbd_next_tok(p, ':', &p);
190         qemu_rbd_unescape(value);
191 
192         if (!strcmp(name, "conf")) {
193             qdict_put_str(options, "conf", value);
194         } else if (!strcmp(name, "id")) {
195             qdict_put_str(options, "user", value);
196         } else {
197             /*
198              * We pass these internally to qemu_rbd_set_keypairs(), so
199              * we can get away with the simpler list of [ "key1",
200              * "value1", "key2", "value2" ] rather than a raw dict
201              * { "key1": "value1", "key2": "value2" } where we can't
202              * guarantee order, or even a more correct but complex
203              * [ { "key1": "value1" }, { "key2": "value2" } ]
204              */
205             if (!keypairs) {
206                 keypairs = qlist_new();
207             }
208             qlist_append_str(keypairs, name);
209             qlist_append_str(keypairs, value);
210         }
211     }
212 
213     if (keypairs) {
214         qdict_put(options, "=keyvalue-pairs",
215                   qobject_to_json(QOBJECT(keypairs)));
216     }
217 
218 done:
219     g_free(buf);
220     QDECREF(keypairs);
221     return;
222 }
223 
224 
225 static int qemu_rbd_set_auth(rados_t cluster, const char *secretid,
226                              Error **errp)
227 {
228     if (secretid == 0) {
229         return 0;
230     }
231 
232     gchar *secret = qcrypto_secret_lookup_as_base64(secretid,
233                                                     errp);
234     if (!secret) {
235         return -1;
236     }
237 
238     rados_conf_set(cluster, "key", secret);
239     g_free(secret);
240 
241     return 0;
242 }
243 
244 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
245                                  Error **errp)
246 {
247     QList *keypairs;
248     QString *name;
249     QString *value;
250     const char *key;
251     size_t remaining;
252     int ret = 0;
253 
254     if (!keypairs_json) {
255         return ret;
256     }
257     keypairs = qobject_to_qlist(qobject_from_json(keypairs_json,
258                                                   &error_abort));
259     remaining = qlist_size(keypairs) / 2;
260     assert(remaining);
261 
262     while (remaining--) {
263         name = qobject_to_qstring(qlist_pop(keypairs));
264         value = qobject_to_qstring(qlist_pop(keypairs));
265         assert(name && value);
266         key = qstring_get_str(name);
267 
268         ret = rados_conf_set(cluster, key, qstring_get_str(value));
269         QDECREF(name);
270         QDECREF(value);
271         if (ret < 0) {
272             error_setg_errno(errp, -ret, "invalid conf option %s", key);
273             ret = -EINVAL;
274             break;
275         }
276     }
277 
278     QDECREF(keypairs);
279     return ret;
280 }
281 
282 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
283 {
284     if (LIBRBD_USE_IOVEC) {
285         RBDAIOCB *acb = rcb->acb;
286         iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
287                    acb->qiov->size - offs);
288     } else {
289         memset(rcb->buf + offs, 0, rcb->size - offs);
290     }
291 }
292 
293 static QemuOptsList runtime_opts = {
294     .name = "rbd",
295     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
296     .desc = {
297         {
298             .name = "pool",
299             .type = QEMU_OPT_STRING,
300             .help = "Rados pool name",
301         },
302         {
303             .name = "image",
304             .type = QEMU_OPT_STRING,
305             .help = "Image name in the pool",
306         },
307         {
308             .name = "conf",
309             .type = QEMU_OPT_STRING,
310             .help = "Rados config file location",
311         },
312         {
313             .name = "snapshot",
314             .type = QEMU_OPT_STRING,
315             .help = "Ceph snapshot name",
316         },
317         {
318             /* maps to 'id' in rados_create() */
319             .name = "user",
320             .type = QEMU_OPT_STRING,
321             .help = "Rados id name",
322         },
323         /*
324          * server.* extracted manually, see qemu_rbd_mon_host()
325          */
326         {
327             .name = "password-secret",
328             .type = QEMU_OPT_STRING,
329             .help = "ID of secret providing the password",
330         },
331 
332         /*
333          * Keys for qemu_rbd_parse_filename(), not in the QAPI schema
334          */
335         {
336             /*
337              * HACK: name starts with '=' so that qemu_opts_parse()
338              * can't set it
339              */
340             .name = "=keyvalue-pairs",
341             .type = QEMU_OPT_STRING,
342             .help = "Legacy rados key/value option parameters",
343         },
344         {
345             .name = "filename",
346             .type = QEMU_OPT_STRING,
347         },
348         { /* end of list */ }
349     },
350 };
351 
352 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
353 {
354     Error *local_err = NULL;
355     int64_t bytes = 0;
356     int64_t objsize;
357     int obj_order = 0;
358     const char *pool, *image_name, *conf, *user, *keypairs;
359     const char *secretid;
360     rados_t cluster;
361     rados_ioctx_t io_ctx;
362     QDict *options = NULL;
363     int ret = 0;
364 
365     secretid = qemu_opt_get(opts, "password-secret");
366 
367     /* Read out options */
368     bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
369                      BDRV_SECTOR_SIZE);
370     objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0);
371     if (objsize) {
372         if ((objsize - 1) & objsize) {    /* not a power of 2? */
373             error_setg(errp, "obj size needs to be power of 2");
374             ret = -EINVAL;
375             goto exit;
376         }
377         if (objsize < 4096) {
378             error_setg(errp, "obj size too small");
379             ret = -EINVAL;
380             goto exit;
381         }
382         obj_order = ctz32(objsize);
383     }
384 
385     options = qdict_new();
386     qemu_rbd_parse_filename(filename, options, &local_err);
387     if (local_err) {
388         ret = -EINVAL;
389         error_propagate(errp, local_err);
390         goto exit;
391     }
392 
393     /*
394      * Caution: while qdict_get_try_str() is fine, getting non-string
395      * types would require more care.  When @options come from -blockdev
396      * or blockdev_add, its members are typed according to the QAPI
397      * schema, but when they come from -drive, they're all QString.
398      */
399     pool       = qdict_get_try_str(options, "pool");
400     conf       = qdict_get_try_str(options, "conf");
401     user       = qdict_get_try_str(options, "user");
402     image_name = qdict_get_try_str(options, "image");
403     keypairs   = qdict_get_try_str(options, "=keyvalue-pairs");
404 
405     ret = rados_create(&cluster, user);
406     if (ret < 0) {
407         error_setg_errno(errp, -ret, "error initializing");
408         goto exit;
409     }
410 
411     /* try default location when conf=NULL, but ignore failure */
412     ret = rados_conf_read_file(cluster, conf);
413     if (conf && ret < 0) {
414         error_setg_errno(errp, -ret, "error reading conf file %s", conf);
415         ret = -EIO;
416         goto shutdown;
417     }
418 
419     ret = qemu_rbd_set_keypairs(cluster, keypairs, errp);
420     if (ret < 0) {
421         ret = -EIO;
422         goto shutdown;
423     }
424 
425     if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) {
426         ret = -EIO;
427         goto shutdown;
428     }
429 
430     ret = rados_connect(cluster);
431     if (ret < 0) {
432         error_setg_errno(errp, -ret, "error connecting");
433         goto shutdown;
434     }
435 
436     ret = rados_ioctx_create(cluster, pool, &io_ctx);
437     if (ret < 0) {
438         error_setg_errno(errp, -ret, "error opening pool %s", pool);
439         goto shutdown;
440     }
441 
442     ret = rbd_create(io_ctx, image_name, bytes, &obj_order);
443     if (ret < 0) {
444         error_setg_errno(errp, -ret, "error rbd create");
445     }
446 
447     rados_ioctx_destroy(io_ctx);
448 
449 shutdown:
450     rados_shutdown(cluster);
451 
452 exit:
453     QDECREF(options);
454     return ret;
455 }
456 
457 /*
458  * This aio completion is being called from rbd_finish_bh() and runs in qemu
459  * BH context.
460  */
461 static void qemu_rbd_complete_aio(RADOSCB *rcb)
462 {
463     RBDAIOCB *acb = rcb->acb;
464     int64_t r;
465 
466     r = rcb->ret;
467 
468     if (acb->cmd != RBD_AIO_READ) {
469         if (r < 0) {
470             acb->ret = r;
471             acb->error = 1;
472         } else if (!acb->error) {
473             acb->ret = rcb->size;
474         }
475     } else {
476         if (r < 0) {
477             qemu_rbd_memset(rcb, 0);
478             acb->ret = r;
479             acb->error = 1;
480         } else if (r < rcb->size) {
481             qemu_rbd_memset(rcb, r);
482             if (!acb->error) {
483                 acb->ret = rcb->size;
484             }
485         } else if (!acb->error) {
486             acb->ret = r;
487         }
488     }
489 
490     g_free(rcb);
491 
492     if (!LIBRBD_USE_IOVEC) {
493         if (acb->cmd == RBD_AIO_READ) {
494             qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
495         }
496         qemu_vfree(acb->bounce);
497     }
498 
499     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
500 
501     qemu_aio_unref(acb);
502 }
503 
504 static char *qemu_rbd_mon_host(QDict *options, Error **errp)
505 {
506     const char **vals = g_new(const char *, qdict_size(options) + 1);
507     char keybuf[32];
508     const char *host, *port;
509     char *rados_str;
510     int i;
511 
512     for (i = 0;; i++) {
513         sprintf(keybuf, "server.%d.host", i);
514         host = qdict_get_try_str(options, keybuf);
515         qdict_del(options, keybuf);
516         sprintf(keybuf, "server.%d.port", i);
517         port = qdict_get_try_str(options, keybuf);
518         qdict_del(options, keybuf);
519         if (!host && !port) {
520             break;
521         }
522         if (!host) {
523             error_setg(errp, "Parameter server.%d.host is missing", i);
524             rados_str = NULL;
525             goto out;
526         }
527 
528         if (strchr(host, ':')) {
529             vals[i] = port ? g_strdup_printf("[%s]:%s", host, port)
530                 : g_strdup_printf("[%s]", host);
531         } else {
532             vals[i] = port ? g_strdup_printf("%s:%s", host, port)
533                 : g_strdup(host);
534         }
535     }
536     vals[i] = NULL;
537 
538     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
539 out:
540     g_strfreev((char **)vals);
541     return rados_str;
542 }
543 
544 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
545                          Error **errp)
546 {
547     BDRVRBDState *s = bs->opaque;
548     const char *pool, *snap, *conf, *user, *image_name, *keypairs;
549     const char *secretid, *filename;
550     QemuOpts *opts;
551     Error *local_err = NULL;
552     char *mon_host = NULL;
553     int r;
554 
555     /* If we are given a filename, parse the filename, with precedence given to
556      * filename encoded options */
557     filename = qdict_get_try_str(options, "filename");
558     if (filename) {
559         warn_report("'filename' option specified. "
560                     "This is an unsupported option, and may be deprecated "
561                     "in the future");
562         qemu_rbd_parse_filename(filename, options, &local_err);
563         if (local_err) {
564             r = -EINVAL;
565             error_propagate(errp, local_err);
566             goto exit;
567         }
568     }
569 
570     opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
571     qemu_opts_absorb_qdict(opts, options, &local_err);
572     if (local_err) {
573         error_propagate(errp, local_err);
574         r = -EINVAL;
575         goto failed_opts;
576     }
577 
578     mon_host = qemu_rbd_mon_host(options, &local_err);
579     if (local_err) {
580         error_propagate(errp, local_err);
581         r = -EINVAL;
582         goto failed_opts;
583     }
584 
585     secretid = qemu_opt_get(opts, "password-secret");
586 
587     pool           = qemu_opt_get(opts, "pool");
588     conf           = qemu_opt_get(opts, "conf");
589     snap           = qemu_opt_get(opts, "snapshot");
590     user           = qemu_opt_get(opts, "user");
591     image_name     = qemu_opt_get(opts, "image");
592     keypairs       = qemu_opt_get(opts, "=keyvalue-pairs");
593 
594     if (!pool || !image_name) {
595         error_setg(errp, "Parameters 'pool' and 'image' are required");
596         r = -EINVAL;
597         goto failed_opts;
598     }
599 
600     r = rados_create(&s->cluster, user);
601     if (r < 0) {
602         error_setg_errno(errp, -r, "error initializing");
603         goto failed_opts;
604     }
605 
606     s->snap = g_strdup(snap);
607     s->image_name = g_strdup(image_name);
608 
609     /* try default location when conf=NULL, but ignore failure */
610     r = rados_conf_read_file(s->cluster, conf);
611     if (conf && r < 0) {
612         error_setg_errno(errp, -r, "error reading conf file %s", conf);
613         goto failed_shutdown;
614     }
615 
616     r = qemu_rbd_set_keypairs(s->cluster, keypairs, errp);
617     if (r < 0) {
618         goto failed_shutdown;
619     }
620 
621     if (mon_host) {
622         r = rados_conf_set(s->cluster, "mon_host", mon_host);
623         if (r < 0) {
624             goto failed_shutdown;
625         }
626     }
627 
628     if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) {
629         r = -EIO;
630         goto failed_shutdown;
631     }
632 
633     /*
634      * Fallback to more conservative semantics if setting cache
635      * options fails. Ignore errors from setting rbd_cache because the
636      * only possible error is that the option does not exist, and
637      * librbd defaults to no caching. If write through caching cannot
638      * be set up, fall back to no caching.
639      */
640     if (flags & BDRV_O_NOCACHE) {
641         rados_conf_set(s->cluster, "rbd_cache", "false");
642     } else {
643         rados_conf_set(s->cluster, "rbd_cache", "true");
644     }
645 
646     r = rados_connect(s->cluster);
647     if (r < 0) {
648         error_setg_errno(errp, -r, "error connecting");
649         goto failed_shutdown;
650     }
651 
652     r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
653     if (r < 0) {
654         error_setg_errno(errp, -r, "error opening pool %s", pool);
655         goto failed_shutdown;
656     }
657 
658     /* rbd_open is always r/w */
659     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
660     if (r < 0) {
661         error_setg_errno(errp, -r, "error reading header from %s",
662                          s->image_name);
663         goto failed_open;
664     }
665 
666     /* If we are using an rbd snapshot, we must be r/o, otherwise
667      * leave as-is */
668     if (s->snap != NULL) {
669         if (!bdrv_is_read_only(bs)) {
670             error_report("Opening rbd snapshots without an explicit "
671                          "read-only=on option is deprecated. Future versions "
672                          "will refuse to open the image instead of "
673                          "automatically marking the image read-only.");
674             r = bdrv_set_read_only(bs, true, &local_err);
675             if (r < 0) {
676                 error_propagate(errp, local_err);
677                 goto failed_open;
678             }
679         }
680     }
681 
682     qemu_opts_del(opts);
683     return 0;
684 
685 failed_open:
686     rados_ioctx_destroy(s->io_ctx);
687 failed_shutdown:
688     rados_shutdown(s->cluster);
689     g_free(s->snap);
690     g_free(s->image_name);
691 failed_opts:
692     qemu_opts_del(opts);
693     g_free(mon_host);
694 exit:
695     return r;
696 }
697 
698 
699 /* Since RBD is currently always opened R/W via the API,
700  * we just need to check if we are using a snapshot or not, in
701  * order to determine if we will allow it to be R/W */
702 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
703                                    BlockReopenQueue *queue, Error **errp)
704 {
705     BDRVRBDState *s = state->bs->opaque;
706     int ret = 0;
707 
708     if (s->snap && state->flags & BDRV_O_RDWR) {
709         error_setg(errp,
710                    "Cannot change node '%s' to r/w when using RBD snapshot",
711                    bdrv_get_device_or_node_name(state->bs));
712         ret = -EINVAL;
713     }
714 
715     return ret;
716 }
717 
718 static void qemu_rbd_close(BlockDriverState *bs)
719 {
720     BDRVRBDState *s = bs->opaque;
721 
722     rbd_close(s->image);
723     rados_ioctx_destroy(s->io_ctx);
724     g_free(s->snap);
725     g_free(s->image_name);
726     rados_shutdown(s->cluster);
727 }
728 
729 static const AIOCBInfo rbd_aiocb_info = {
730     .aiocb_size = sizeof(RBDAIOCB),
731 };
732 
733 static void rbd_finish_bh(void *opaque)
734 {
735     RADOSCB *rcb = opaque;
736     qemu_rbd_complete_aio(rcb);
737 }
738 
739 /*
740  * This is the callback function for rbd_aio_read and _write
741  *
742  * Note: this function is being called from a non qemu thread so
743  * we need to be careful about what we do here. Generally we only
744  * schedule a BH, and do the rest of the io completion handling
745  * from rbd_finish_bh() which runs in a qemu context.
746  */
747 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
748 {
749     RBDAIOCB *acb = rcb->acb;
750 
751     rcb->ret = rbd_aio_get_return_value(c);
752     rbd_aio_release(c);
753 
754     aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
755                             rbd_finish_bh, rcb);
756 }
757 
758 static int rbd_aio_discard_wrapper(rbd_image_t image,
759                                    uint64_t off,
760                                    uint64_t len,
761                                    rbd_completion_t comp)
762 {
763 #ifdef LIBRBD_SUPPORTS_DISCARD
764     return rbd_aio_discard(image, off, len, comp);
765 #else
766     return -ENOTSUP;
767 #endif
768 }
769 
770 static int rbd_aio_flush_wrapper(rbd_image_t image,
771                                  rbd_completion_t comp)
772 {
773 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
774     return rbd_aio_flush(image, comp);
775 #else
776     return -ENOTSUP;
777 #endif
778 }
779 
780 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
781                                  int64_t off,
782                                  QEMUIOVector *qiov,
783                                  int64_t size,
784                                  BlockCompletionFunc *cb,
785                                  void *opaque,
786                                  RBDAIOCmd cmd)
787 {
788     RBDAIOCB *acb;
789     RADOSCB *rcb = NULL;
790     rbd_completion_t c;
791     int r;
792 
793     BDRVRBDState *s = bs->opaque;
794 
795     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
796     acb->cmd = cmd;
797     acb->qiov = qiov;
798     assert(!qiov || qiov->size == size);
799 
800     rcb = g_new(RADOSCB, 1);
801 
802     if (!LIBRBD_USE_IOVEC) {
803         if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
804             acb->bounce = NULL;
805         } else {
806             acb->bounce = qemu_try_blockalign(bs, qiov->size);
807             if (acb->bounce == NULL) {
808                 goto failed;
809             }
810         }
811         if (cmd == RBD_AIO_WRITE) {
812             qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
813         }
814         rcb->buf = acb->bounce;
815     }
816 
817     acb->ret = 0;
818     acb->error = 0;
819     acb->s = s;
820 
821     rcb->acb = acb;
822     rcb->s = acb->s;
823     rcb->size = size;
824     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
825     if (r < 0) {
826         goto failed;
827     }
828 
829     switch (cmd) {
830     case RBD_AIO_WRITE:
831 #ifdef LIBRBD_SUPPORTS_IOVEC
832             r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
833 #else
834             r = rbd_aio_write(s->image, off, size, rcb->buf, c);
835 #endif
836         break;
837     case RBD_AIO_READ:
838 #ifdef LIBRBD_SUPPORTS_IOVEC
839             r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
840 #else
841             r = rbd_aio_read(s->image, off, size, rcb->buf, c);
842 #endif
843         break;
844     case RBD_AIO_DISCARD:
845         r = rbd_aio_discard_wrapper(s->image, off, size, c);
846         break;
847     case RBD_AIO_FLUSH:
848         r = rbd_aio_flush_wrapper(s->image, c);
849         break;
850     default:
851         r = -EINVAL;
852     }
853 
854     if (r < 0) {
855         goto failed_completion;
856     }
857     return &acb->common;
858 
859 failed_completion:
860     rbd_aio_release(c);
861 failed:
862     g_free(rcb);
863     if (!LIBRBD_USE_IOVEC) {
864         qemu_vfree(acb->bounce);
865     }
866 
867     qemu_aio_unref(acb);
868     return NULL;
869 }
870 
871 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
872                                       int64_t sector_num,
873                                       QEMUIOVector *qiov,
874                                       int nb_sectors,
875                                       BlockCompletionFunc *cb,
876                                       void *opaque)
877 {
878     return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
879                          (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
880                          RBD_AIO_READ);
881 }
882 
883 static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
884                                        int64_t sector_num,
885                                        QEMUIOVector *qiov,
886                                        int nb_sectors,
887                                        BlockCompletionFunc *cb,
888                                        void *opaque)
889 {
890     return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
891                          (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
892                          RBD_AIO_WRITE);
893 }
894 
895 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
896 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
897                                       BlockCompletionFunc *cb,
898                                       void *opaque)
899 {
900     return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
901 }
902 
903 #else
904 
905 static int qemu_rbd_co_flush(BlockDriverState *bs)
906 {
907 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
908     /* rbd_flush added in 0.1.1 */
909     BDRVRBDState *s = bs->opaque;
910     return rbd_flush(s->image);
911 #else
912     return 0;
913 #endif
914 }
915 #endif
916 
917 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
918 {
919     BDRVRBDState *s = bs->opaque;
920     rbd_image_info_t info;
921     int r;
922 
923     r = rbd_stat(s->image, &info, sizeof(info));
924     if (r < 0) {
925         return r;
926     }
927 
928     bdi->cluster_size = info.obj_size;
929     return 0;
930 }
931 
932 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
933 {
934     BDRVRBDState *s = bs->opaque;
935     rbd_image_info_t info;
936     int r;
937 
938     r = rbd_stat(s->image, &info, sizeof(info));
939     if (r < 0) {
940         return r;
941     }
942 
943     return info.size;
944 }
945 
946 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset,
947                              PreallocMode prealloc, Error **errp)
948 {
949     BDRVRBDState *s = bs->opaque;
950     int r;
951 
952     if (prealloc != PREALLOC_MODE_OFF) {
953         error_setg(errp, "Unsupported preallocation mode '%s'",
954                    PreallocMode_str(prealloc));
955         return -ENOTSUP;
956     }
957 
958     r = rbd_resize(s->image, offset);
959     if (r < 0) {
960         error_setg_errno(errp, -r, "Failed to resize file");
961         return r;
962     }
963 
964     return 0;
965 }
966 
967 static int qemu_rbd_snap_create(BlockDriverState *bs,
968                                 QEMUSnapshotInfo *sn_info)
969 {
970     BDRVRBDState *s = bs->opaque;
971     int r;
972 
973     if (sn_info->name[0] == '\0') {
974         return -EINVAL; /* we need a name for rbd snapshots */
975     }
976 
977     /*
978      * rbd snapshots are using the name as the user controlled unique identifier
979      * we can't use the rbd snapid for that purpose, as it can't be set
980      */
981     if (sn_info->id_str[0] != '\0' &&
982         strcmp(sn_info->id_str, sn_info->name) != 0) {
983         return -EINVAL;
984     }
985 
986     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
987         return -ERANGE;
988     }
989 
990     r = rbd_snap_create(s->image, sn_info->name);
991     if (r < 0) {
992         error_report("failed to create snap: %s", strerror(-r));
993         return r;
994     }
995 
996     return 0;
997 }
998 
999 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1000                                 const char *snapshot_id,
1001                                 const char *snapshot_name,
1002                                 Error **errp)
1003 {
1004     BDRVRBDState *s = bs->opaque;
1005     int r;
1006 
1007     if (!snapshot_name) {
1008         error_setg(errp, "rbd need a valid snapshot name");
1009         return -EINVAL;
1010     }
1011 
1012     /* If snapshot_id is specified, it must be equal to name, see
1013        qemu_rbd_snap_list() */
1014     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1015         error_setg(errp,
1016                    "rbd do not support snapshot id, it should be NULL or "
1017                    "equal to snapshot name");
1018         return -EINVAL;
1019     }
1020 
1021     r = rbd_snap_remove(s->image, snapshot_name);
1022     if (r < 0) {
1023         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1024     }
1025     return r;
1026 }
1027 
1028 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1029                                   const char *snapshot_name)
1030 {
1031     BDRVRBDState *s = bs->opaque;
1032 
1033     return rbd_snap_rollback(s->image, snapshot_name);
1034 }
1035 
1036 static int qemu_rbd_snap_list(BlockDriverState *bs,
1037                               QEMUSnapshotInfo **psn_tab)
1038 {
1039     BDRVRBDState *s = bs->opaque;
1040     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1041     int i, snap_count;
1042     rbd_snap_info_t *snaps;
1043     int max_snaps = RBD_MAX_SNAPS;
1044 
1045     do {
1046         snaps = g_new(rbd_snap_info_t, max_snaps);
1047         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1048         if (snap_count <= 0) {
1049             g_free(snaps);
1050         }
1051     } while (snap_count == -ERANGE);
1052 
1053     if (snap_count <= 0) {
1054         goto done;
1055     }
1056 
1057     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1058 
1059     for (i = 0; i < snap_count; i++) {
1060         const char *snap_name = snaps[i].name;
1061 
1062         sn_info = sn_tab + i;
1063         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1064         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1065 
1066         sn_info->vm_state_size = snaps[i].size;
1067         sn_info->date_sec = 0;
1068         sn_info->date_nsec = 0;
1069         sn_info->vm_clock_nsec = 0;
1070     }
1071     rbd_snap_list_end(snaps);
1072     g_free(snaps);
1073 
1074  done:
1075     *psn_tab = sn_tab;
1076     return snap_count;
1077 }
1078 
1079 #ifdef LIBRBD_SUPPORTS_DISCARD
1080 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1081                                          int64_t offset,
1082                                          int bytes,
1083                                          BlockCompletionFunc *cb,
1084                                          void *opaque)
1085 {
1086     return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
1087                          RBD_AIO_DISCARD);
1088 }
1089 #endif
1090 
1091 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1092 static void qemu_rbd_invalidate_cache(BlockDriverState *bs,
1093                                       Error **errp)
1094 {
1095     BDRVRBDState *s = bs->opaque;
1096     int r = rbd_invalidate_cache(s->image);
1097     if (r < 0) {
1098         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1099     }
1100 }
1101 #endif
1102 
1103 static QemuOptsList qemu_rbd_create_opts = {
1104     .name = "rbd-create-opts",
1105     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1106     .desc = {
1107         {
1108             .name = BLOCK_OPT_SIZE,
1109             .type = QEMU_OPT_SIZE,
1110             .help = "Virtual disk size"
1111         },
1112         {
1113             .name = BLOCK_OPT_CLUSTER_SIZE,
1114             .type = QEMU_OPT_SIZE,
1115             .help = "RBD object size"
1116         },
1117         {
1118             .name = "password-secret",
1119             .type = QEMU_OPT_STRING,
1120             .help = "ID of secret providing the password",
1121         },
1122         { /* end of list */ }
1123     }
1124 };
1125 
1126 static BlockDriver bdrv_rbd = {
1127     .format_name            = "rbd",
1128     .instance_size          = sizeof(BDRVRBDState),
1129     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1130     .bdrv_file_open         = qemu_rbd_open,
1131     .bdrv_close             = qemu_rbd_close,
1132     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1133     .bdrv_create            = qemu_rbd_create,
1134     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1135     .bdrv_get_info          = qemu_rbd_getinfo,
1136     .create_opts            = &qemu_rbd_create_opts,
1137     .bdrv_getlength         = qemu_rbd_getlength,
1138     .bdrv_truncate          = qemu_rbd_truncate,
1139     .protocol_name          = "rbd",
1140 
1141     .bdrv_aio_readv         = qemu_rbd_aio_readv,
1142     .bdrv_aio_writev        = qemu_rbd_aio_writev,
1143 
1144 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1145     .bdrv_aio_flush         = qemu_rbd_aio_flush,
1146 #else
1147     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1148 #endif
1149 
1150 #ifdef LIBRBD_SUPPORTS_DISCARD
1151     .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1152 #endif
1153 
1154     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1155     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1156     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1157     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1158 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1159     .bdrv_invalidate_cache  = qemu_rbd_invalidate_cache,
1160 #endif
1161 };
1162 
1163 static void bdrv_rbd_init(void)
1164 {
1165     bdrv_register(&bdrv_rbd);
1166 }
1167 
1168 block_init(bdrv_rbd_init);
1169