xref: /qemu/block/rbd.c (revision 6f0dd6c5)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block_int.h"
22 #include "block/qdict.h"
23 #include "crypto/secret.h"
24 #include "qemu/cutils.h"
25 #include "qapi/qmp/qstring.h"
26 #include "qapi/qmp/qdict.h"
27 #include "qapi/qmp/qjson.h"
28 #include "qapi/qmp/qlist.h"
29 #include "qapi/qobject-input-visitor.h"
30 #include "qapi/qapi-visit-block-core.h"
31 
32 /*
33  * When specifying the image filename use:
34  *
35  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
36  *
37  * poolname must be the name of an existing rados pool.
38  *
39  * devicename is the name of the rbd image.
40  *
41  * Each option given is used to configure rados, and may be any valid
42  * Ceph option, "id", or "conf".
43  *
44  * The "id" option indicates what user we should authenticate as to
45  * the Ceph cluster.  If it is excluded we will use the Ceph default
46  * (normally 'admin').
47  *
48  * The "conf" option specifies a Ceph configuration file to read.  If
49  * it is not specified, we will read from the default Ceph locations
50  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
51  * file, specify conf=/dev/null.
52  *
53  * Configuration values containing :, @, or = can be escaped with a
54  * leading "\".
55  */
56 
57 /* rbd_aio_discard added in 0.1.2 */
58 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
59 #define LIBRBD_SUPPORTS_DISCARD
60 #else
61 #undef LIBRBD_SUPPORTS_DISCARD
62 #endif
63 
64 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
65 
66 #define RBD_MAX_SNAPS 100
67 
68 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
69 #ifdef LIBRBD_SUPPORTS_IOVEC
70 #define LIBRBD_USE_IOVEC 1
71 #else
72 #define LIBRBD_USE_IOVEC 0
73 #endif
74 
75 typedef enum {
76     RBD_AIO_READ,
77     RBD_AIO_WRITE,
78     RBD_AIO_DISCARD,
79     RBD_AIO_FLUSH
80 } RBDAIOCmd;
81 
82 typedef struct RBDAIOCB {
83     BlockAIOCB common;
84     int64_t ret;
85     QEMUIOVector *qiov;
86     char *bounce;
87     RBDAIOCmd cmd;
88     int error;
89     struct BDRVRBDState *s;
90 } RBDAIOCB;
91 
92 typedef struct RADOSCB {
93     RBDAIOCB *acb;
94     struct BDRVRBDState *s;
95     int64_t size;
96     char *buf;
97     int64_t ret;
98 } RADOSCB;
99 
100 typedef struct BDRVRBDState {
101     rados_t cluster;
102     rados_ioctx_t io_ctx;
103     rbd_image_t image;
104     char *image_name;
105     char *snap;
106     uint64_t image_size;
107 } BDRVRBDState;
108 
109 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
110                             BlockdevOptionsRbd *opts, bool cache,
111                             const char *keypairs, const char *secretid,
112                             Error **errp);
113 
114 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
115 {
116     char *end;
117 
118     *p = NULL;
119 
120     for (end = src; *end; ++end) {
121         if (*end == delim) {
122             break;
123         }
124         if (*end == '\\' && end[1] != '\0') {
125             end++;
126         }
127     }
128     if (*end == delim) {
129         *p = end + 1;
130         *end = '\0';
131     }
132     return src;
133 }
134 
135 static void qemu_rbd_unescape(char *src)
136 {
137     char *p;
138 
139     for (p = src; *src; ++src, ++p) {
140         if (*src == '\\' && src[1] != '\0') {
141             src++;
142         }
143         *p = *src;
144     }
145     *p = '\0';
146 }
147 
148 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
149                                     Error **errp)
150 {
151     const char *start;
152     char *p, *buf;
153     QList *keypairs = NULL;
154     char *found_str;
155 
156     if (!strstart(filename, "rbd:", &start)) {
157         error_setg(errp, "File name must start with 'rbd:'");
158         return;
159     }
160 
161     buf = g_strdup(start);
162     p = buf;
163 
164     found_str = qemu_rbd_next_tok(p, '/', &p);
165     if (!p) {
166         error_setg(errp, "Pool name is required");
167         goto done;
168     }
169     qemu_rbd_unescape(found_str);
170     qdict_put_str(options, "pool", found_str);
171 
172     if (strchr(p, '@')) {
173         found_str = qemu_rbd_next_tok(p, '@', &p);
174         qemu_rbd_unescape(found_str);
175         qdict_put_str(options, "image", found_str);
176 
177         found_str = qemu_rbd_next_tok(p, ':', &p);
178         qemu_rbd_unescape(found_str);
179         qdict_put_str(options, "snapshot", found_str);
180     } else {
181         found_str = qemu_rbd_next_tok(p, ':', &p);
182         qemu_rbd_unescape(found_str);
183         qdict_put_str(options, "image", found_str);
184     }
185     if (!p) {
186         goto done;
187     }
188 
189     /* The following are essentially all key/value pairs, and we treat
190      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
191     while (p) {
192         char *name, *value;
193         name = qemu_rbd_next_tok(p, '=', &p);
194         if (!p) {
195             error_setg(errp, "conf option %s has no value", name);
196             break;
197         }
198 
199         qemu_rbd_unescape(name);
200 
201         value = qemu_rbd_next_tok(p, ':', &p);
202         qemu_rbd_unescape(value);
203 
204         if (!strcmp(name, "conf")) {
205             qdict_put_str(options, "conf", value);
206         } else if (!strcmp(name, "id")) {
207             qdict_put_str(options, "user", value);
208         } else {
209             /*
210              * We pass these internally to qemu_rbd_set_keypairs(), so
211              * we can get away with the simpler list of [ "key1",
212              * "value1", "key2", "value2" ] rather than a raw dict
213              * { "key1": "value1", "key2": "value2" } where we can't
214              * guarantee order, or even a more correct but complex
215              * [ { "key1": "value1" }, { "key2": "value2" } ]
216              */
217             if (!keypairs) {
218                 keypairs = qlist_new();
219             }
220             qlist_append_str(keypairs, name);
221             qlist_append_str(keypairs, value);
222         }
223     }
224 
225     if (keypairs) {
226         qdict_put(options, "=keyvalue-pairs",
227                   qobject_to_json(QOBJECT(keypairs)));
228     }
229 
230 done:
231     g_free(buf);
232     qobject_unref(keypairs);
233     return;
234 }
235 
236 
237 static void qemu_rbd_refresh_limits(BlockDriverState *bs, Error **errp)
238 {
239     /* XXX Does RBD support AIO on less than 512-byte alignment? */
240     bs->bl.request_alignment = 512;
241 }
242 
243 
244 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
245                              Error **errp)
246 {
247     char *key, *acr;
248     int r;
249     GString *accu;
250     RbdAuthModeList *auth;
251 
252     if (opts->key_secret) {
253         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
254         if (!key) {
255             return -EIO;
256         }
257         r = rados_conf_set(cluster, "key", key);
258         g_free(key);
259         if (r < 0) {
260             error_setg_errno(errp, -r, "Could not set 'key'");
261             return r;
262         }
263     }
264 
265     if (opts->has_auth_client_required) {
266         accu = g_string_new("");
267         for (auth = opts->auth_client_required; auth; auth = auth->next) {
268             if (accu->str[0]) {
269                 g_string_append_c(accu, ';');
270             }
271             g_string_append(accu, RbdAuthMode_str(auth->value));
272         }
273         acr = g_string_free(accu, FALSE);
274         r = rados_conf_set(cluster, "auth_client_required", acr);
275         g_free(acr);
276         if (r < 0) {
277             error_setg_errno(errp, -r,
278                              "Could not set 'auth_client_required'");
279             return r;
280         }
281     }
282 
283     return 0;
284 }
285 
286 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
287                                  Error **errp)
288 {
289     QList *keypairs;
290     QString *name;
291     QString *value;
292     const char *key;
293     size_t remaining;
294     int ret = 0;
295 
296     if (!keypairs_json) {
297         return ret;
298     }
299     keypairs = qobject_to(QList,
300                           qobject_from_json(keypairs_json, &error_abort));
301     remaining = qlist_size(keypairs) / 2;
302     assert(remaining);
303 
304     while (remaining--) {
305         name = qobject_to(QString, qlist_pop(keypairs));
306         value = qobject_to(QString, qlist_pop(keypairs));
307         assert(name && value);
308         key = qstring_get_str(name);
309 
310         ret = rados_conf_set(cluster, key, qstring_get_str(value));
311         qobject_unref(value);
312         if (ret < 0) {
313             error_setg_errno(errp, -ret, "invalid conf option %s", key);
314             qobject_unref(name);
315             ret = -EINVAL;
316             break;
317         }
318         qobject_unref(name);
319     }
320 
321     qobject_unref(keypairs);
322     return ret;
323 }
324 
325 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
326 {
327     if (LIBRBD_USE_IOVEC) {
328         RBDAIOCB *acb = rcb->acb;
329         iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
330                    acb->qiov->size - offs);
331     } else {
332         memset(rcb->buf + offs, 0, rcb->size - offs);
333     }
334 }
335 
336 static QemuOptsList runtime_opts = {
337     .name = "rbd",
338     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
339     .desc = {
340         {
341             .name = "pool",
342             .type = QEMU_OPT_STRING,
343             .help = "Rados pool name",
344         },
345         {
346             .name = "image",
347             .type = QEMU_OPT_STRING,
348             .help = "Image name in the pool",
349         },
350         {
351             .name = "conf",
352             .type = QEMU_OPT_STRING,
353             .help = "Rados config file location",
354         },
355         {
356             .name = "snapshot",
357             .type = QEMU_OPT_STRING,
358             .help = "Ceph snapshot name",
359         },
360         {
361             /* maps to 'id' in rados_create() */
362             .name = "user",
363             .type = QEMU_OPT_STRING,
364             .help = "Rados id name",
365         },
366         /*
367          * server.* extracted manually, see qemu_rbd_mon_host()
368          */
369         { /* end of list */ }
370     },
371 };
372 
373 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
374 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
375                               const char *keypairs, const char *password_secret,
376                               Error **errp)
377 {
378     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
379     rados_t cluster;
380     rados_ioctx_t io_ctx;
381     int obj_order = 0;
382     int ret;
383 
384     assert(options->driver == BLOCKDEV_DRIVER_RBD);
385     if (opts->location->has_snapshot) {
386         error_setg(errp, "Can't use snapshot name for image creation");
387         return -EINVAL;
388     }
389 
390     if (opts->has_cluster_size) {
391         int64_t objsize = opts->cluster_size;
392         if ((objsize - 1) & objsize) {    /* not a power of 2? */
393             error_setg(errp, "obj size needs to be power of 2");
394             return -EINVAL;
395         }
396         if (objsize < 4096) {
397             error_setg(errp, "obj size too small");
398             return -EINVAL;
399         }
400         obj_order = ctz32(objsize);
401     }
402 
403     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
404                            password_secret, errp);
405     if (ret < 0) {
406         return ret;
407     }
408 
409     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
410     if (ret < 0) {
411         error_setg_errno(errp, -ret, "error rbd create");
412         goto out;
413     }
414 
415     ret = 0;
416 out:
417     rados_ioctx_destroy(io_ctx);
418     rados_shutdown(cluster);
419     return ret;
420 }
421 
422 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
423 {
424     return qemu_rbd_do_create(options, NULL, NULL, errp);
425 }
426 
427 static int coroutine_fn qemu_rbd_co_create_opts(const char *filename,
428                                                 QemuOpts *opts,
429                                                 Error **errp)
430 {
431     BlockdevCreateOptions *create_options;
432     BlockdevCreateOptionsRbd *rbd_opts;
433     BlockdevOptionsRbd *loc;
434     Error *local_err = NULL;
435     const char *keypairs, *password_secret;
436     QDict *options = NULL;
437     int ret = 0;
438 
439     create_options = g_new0(BlockdevCreateOptions, 1);
440     create_options->driver = BLOCKDEV_DRIVER_RBD;
441     rbd_opts = &create_options->u.rbd;
442 
443     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
444 
445     password_secret = qemu_opt_get(opts, "password-secret");
446 
447     /* Read out options */
448     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
449                               BDRV_SECTOR_SIZE);
450     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
451                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
452     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
453 
454     options = qdict_new();
455     qemu_rbd_parse_filename(filename, options, &local_err);
456     if (local_err) {
457         ret = -EINVAL;
458         error_propagate(errp, local_err);
459         goto exit;
460     }
461 
462     /*
463      * Caution: while qdict_get_try_str() is fine, getting non-string
464      * types would require more care.  When @options come from -blockdev
465      * or blockdev_add, its members are typed according to the QAPI
466      * schema, but when they come from -drive, they're all QString.
467      */
468     loc = rbd_opts->location;
469     loc->pool     = g_strdup(qdict_get_try_str(options, "pool"));
470     loc->conf     = g_strdup(qdict_get_try_str(options, "conf"));
471     loc->has_conf = !!loc->conf;
472     loc->user     = g_strdup(qdict_get_try_str(options, "user"));
473     loc->has_user = !!loc->user;
474     loc->image    = g_strdup(qdict_get_try_str(options, "image"));
475     keypairs      = qdict_get_try_str(options, "=keyvalue-pairs");
476 
477     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
478     if (ret < 0) {
479         goto exit;
480     }
481 
482 exit:
483     qobject_unref(options);
484     qapi_free_BlockdevCreateOptions(create_options);
485     return ret;
486 }
487 
488 /*
489  * This aio completion is being called from rbd_finish_bh() and runs in qemu
490  * BH context.
491  */
492 static void qemu_rbd_complete_aio(RADOSCB *rcb)
493 {
494     RBDAIOCB *acb = rcb->acb;
495     int64_t r;
496 
497     r = rcb->ret;
498 
499     if (acb->cmd != RBD_AIO_READ) {
500         if (r < 0) {
501             acb->ret = r;
502             acb->error = 1;
503         } else if (!acb->error) {
504             acb->ret = rcb->size;
505         }
506     } else {
507         if (r < 0) {
508             qemu_rbd_memset(rcb, 0);
509             acb->ret = r;
510             acb->error = 1;
511         } else if (r < rcb->size) {
512             qemu_rbd_memset(rcb, r);
513             if (!acb->error) {
514                 acb->ret = rcb->size;
515             }
516         } else if (!acb->error) {
517             acb->ret = r;
518         }
519     }
520 
521     g_free(rcb);
522 
523     if (!LIBRBD_USE_IOVEC) {
524         if (acb->cmd == RBD_AIO_READ) {
525             qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
526         }
527         qemu_vfree(acb->bounce);
528     }
529 
530     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
531 
532     qemu_aio_unref(acb);
533 }
534 
535 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
536 {
537     const char **vals;
538     const char *host, *port;
539     char *rados_str;
540     InetSocketAddressBaseList *p;
541     int i, cnt;
542 
543     if (!opts->has_server) {
544         return NULL;
545     }
546 
547     for (cnt = 0, p = opts->server; p; p = p->next) {
548         cnt++;
549     }
550 
551     vals = g_new(const char *, cnt + 1);
552 
553     for (i = 0, p = opts->server; p; p = p->next, i++) {
554         host = p->value->host;
555         port = p->value->port;
556 
557         if (strchr(host, ':')) {
558             vals[i] = g_strdup_printf("[%s]:%s", host, port);
559         } else {
560             vals[i] = g_strdup_printf("%s:%s", host, port);
561         }
562     }
563     vals[i] = NULL;
564 
565     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
566     g_strfreev((char **)vals);
567     return rados_str;
568 }
569 
570 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
571                             BlockdevOptionsRbd *opts, bool cache,
572                             const char *keypairs, const char *secretid,
573                             Error **errp)
574 {
575     char *mon_host = NULL;
576     Error *local_err = NULL;
577     int r;
578 
579     if (secretid) {
580         if (opts->key_secret) {
581             error_setg(errp,
582                        "Legacy 'password-secret' clashes with 'key-secret'");
583             return -EINVAL;
584         }
585         opts->key_secret = g_strdup(secretid);
586         opts->has_key_secret = true;
587     }
588 
589     mon_host = qemu_rbd_mon_host(opts, &local_err);
590     if (local_err) {
591         error_propagate(errp, local_err);
592         r = -EINVAL;
593         goto failed_opts;
594     }
595 
596     r = rados_create(cluster, opts->user);
597     if (r < 0) {
598         error_setg_errno(errp, -r, "error initializing");
599         goto failed_opts;
600     }
601 
602     /* try default location when conf=NULL, but ignore failure */
603     r = rados_conf_read_file(*cluster, opts->conf);
604     if (opts->has_conf && r < 0) {
605         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
606         goto failed_shutdown;
607     }
608 
609     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
610     if (r < 0) {
611         goto failed_shutdown;
612     }
613 
614     if (mon_host) {
615         r = rados_conf_set(*cluster, "mon_host", mon_host);
616         if (r < 0) {
617             goto failed_shutdown;
618         }
619     }
620 
621     r = qemu_rbd_set_auth(*cluster, opts, errp);
622     if (r < 0) {
623         goto failed_shutdown;
624     }
625 
626     /*
627      * Fallback to more conservative semantics if setting cache
628      * options fails. Ignore errors from setting rbd_cache because the
629      * only possible error is that the option does not exist, and
630      * librbd defaults to no caching. If write through caching cannot
631      * be set up, fall back to no caching.
632      */
633     if (cache) {
634         rados_conf_set(*cluster, "rbd_cache", "true");
635     } else {
636         rados_conf_set(*cluster, "rbd_cache", "false");
637     }
638 
639     r = rados_connect(*cluster);
640     if (r < 0) {
641         error_setg_errno(errp, -r, "error connecting");
642         goto failed_shutdown;
643     }
644 
645     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
646     if (r < 0) {
647         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
648         goto failed_shutdown;
649     }
650 
651     return 0;
652 
653 failed_shutdown:
654     rados_shutdown(*cluster);
655 failed_opts:
656     g_free(mon_host);
657     return r;
658 }
659 
660 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
661                                     Error **errp)
662 {
663     Visitor *v;
664     Error *local_err = NULL;
665 
666     /* Convert the remaining options into a QAPI object */
667     v = qobject_input_visitor_new_flat_confused(options, errp);
668     if (!v) {
669         return -EINVAL;
670     }
671 
672     visit_type_BlockdevOptionsRbd(v, NULL, opts, &local_err);
673     visit_free(v);
674 
675     if (local_err) {
676         error_propagate(errp, local_err);
677         return -EINVAL;
678     }
679 
680     return 0;
681 }
682 
683 static int qemu_rbd_attempt_legacy_options(QDict *options,
684                                            BlockdevOptionsRbd **opts,
685                                            char **keypairs)
686 {
687     char *filename;
688     int r;
689 
690     filename = g_strdup(qdict_get_try_str(options, "filename"));
691     if (!filename) {
692         return -EINVAL;
693     }
694     qdict_del(options, "filename");
695 
696     qemu_rbd_parse_filename(filename, options, NULL);
697 
698     /* keypairs freed by caller */
699     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
700     if (*keypairs) {
701         qdict_del(options, "=keyvalue-pairs");
702     }
703 
704     r = qemu_rbd_convert_options(options, opts, NULL);
705 
706     g_free(filename);
707     return r;
708 }
709 
710 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
711                          Error **errp)
712 {
713     BDRVRBDState *s = bs->opaque;
714     BlockdevOptionsRbd *opts = NULL;
715     const QDictEntry *e;
716     Error *local_err = NULL;
717     char *keypairs, *secretid;
718     int r;
719 
720     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
721     if (keypairs) {
722         qdict_del(options, "=keyvalue-pairs");
723     }
724 
725     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
726     if (secretid) {
727         qdict_del(options, "password-secret");
728     }
729 
730     r = qemu_rbd_convert_options(options, &opts, &local_err);
731     if (local_err) {
732         /* If keypairs are present, that means some options are present in
733          * the modern option format.  Don't attempt to parse legacy option
734          * formats, as we won't support mixed usage. */
735         if (keypairs) {
736             error_propagate(errp, local_err);
737             goto out;
738         }
739 
740         /* If the initial attempt to convert and process the options failed,
741          * we may be attempting to open an image file that has the rbd options
742          * specified in the older format consisting of all key/value pairs
743          * encoded in the filename.  Go ahead and attempt to parse the
744          * filename, and see if we can pull out the required options. */
745         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
746         if (r < 0) {
747             /* Propagate the original error, not the legacy parsing fallback
748              * error, as the latter was just a best-effort attempt. */
749             error_propagate(errp, local_err);
750             goto out;
751         }
752         /* Take care whenever deciding to actually deprecate; once this ability
753          * is removed, we will not be able to open any images with legacy-styled
754          * backing image strings. */
755         warn_report("RBD options encoded in the filename as keyvalue pairs "
756                     "is deprecated");
757     }
758 
759     /* Remove the processed options from the QDict (the visitor processes
760      * _all_ options in the QDict) */
761     while ((e = qdict_first(options))) {
762         qdict_del(options, e->key);
763     }
764 
765     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
766                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
767     if (r < 0) {
768         goto out;
769     }
770 
771     s->snap = g_strdup(opts->snapshot);
772     s->image_name = g_strdup(opts->image);
773 
774     /* rbd_open is always r/w */
775     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
776     if (r < 0) {
777         error_setg_errno(errp, -r, "error reading header from %s",
778                          s->image_name);
779         goto failed_open;
780     }
781 
782     r = rbd_get_size(s->image, &s->image_size);
783     if (r < 0) {
784         error_setg_errno(errp, -r, "error getting image size from %s",
785                          s->image_name);
786         rbd_close(s->image);
787         goto failed_open;
788     }
789 
790     /* If we are using an rbd snapshot, we must be r/o, otherwise
791      * leave as-is */
792     if (s->snap != NULL) {
793         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
794         if (r < 0) {
795             rbd_close(s->image);
796             goto failed_open;
797         }
798     }
799 
800     r = 0;
801     goto out;
802 
803 failed_open:
804     rados_ioctx_destroy(s->io_ctx);
805     g_free(s->snap);
806     g_free(s->image_name);
807     rados_shutdown(s->cluster);
808 out:
809     qapi_free_BlockdevOptionsRbd(opts);
810     g_free(keypairs);
811     g_free(secretid);
812     return r;
813 }
814 
815 
816 /* Since RBD is currently always opened R/W via the API,
817  * we just need to check if we are using a snapshot or not, in
818  * order to determine if we will allow it to be R/W */
819 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
820                                    BlockReopenQueue *queue, Error **errp)
821 {
822     BDRVRBDState *s = state->bs->opaque;
823     int ret = 0;
824 
825     if (s->snap && state->flags & BDRV_O_RDWR) {
826         error_setg(errp,
827                    "Cannot change node '%s' to r/w when using RBD snapshot",
828                    bdrv_get_device_or_node_name(state->bs));
829         ret = -EINVAL;
830     }
831 
832     return ret;
833 }
834 
835 static void qemu_rbd_close(BlockDriverState *bs)
836 {
837     BDRVRBDState *s = bs->opaque;
838 
839     rbd_close(s->image);
840     rados_ioctx_destroy(s->io_ctx);
841     g_free(s->snap);
842     g_free(s->image_name);
843     rados_shutdown(s->cluster);
844 }
845 
846 /* Resize the RBD image and update the 'image_size' with the current size */
847 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
848 {
849     BDRVRBDState *s = bs->opaque;
850     int r;
851 
852     r = rbd_resize(s->image, size);
853     if (r < 0) {
854         return r;
855     }
856 
857     s->image_size = size;
858 
859     return 0;
860 }
861 
862 static const AIOCBInfo rbd_aiocb_info = {
863     .aiocb_size = sizeof(RBDAIOCB),
864 };
865 
866 static void rbd_finish_bh(void *opaque)
867 {
868     RADOSCB *rcb = opaque;
869     qemu_rbd_complete_aio(rcb);
870 }
871 
872 /*
873  * This is the callback function for rbd_aio_read and _write
874  *
875  * Note: this function is being called from a non qemu thread so
876  * we need to be careful about what we do here. Generally we only
877  * schedule a BH, and do the rest of the io completion handling
878  * from rbd_finish_bh() which runs in a qemu context.
879  */
880 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
881 {
882     RBDAIOCB *acb = rcb->acb;
883 
884     rcb->ret = rbd_aio_get_return_value(c);
885     rbd_aio_release(c);
886 
887     aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
888                             rbd_finish_bh, rcb);
889 }
890 
891 static int rbd_aio_discard_wrapper(rbd_image_t image,
892                                    uint64_t off,
893                                    uint64_t len,
894                                    rbd_completion_t comp)
895 {
896 #ifdef LIBRBD_SUPPORTS_DISCARD
897     return rbd_aio_discard(image, off, len, comp);
898 #else
899     return -ENOTSUP;
900 #endif
901 }
902 
903 static int rbd_aio_flush_wrapper(rbd_image_t image,
904                                  rbd_completion_t comp)
905 {
906 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
907     return rbd_aio_flush(image, comp);
908 #else
909     return -ENOTSUP;
910 #endif
911 }
912 
913 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
914                                  int64_t off,
915                                  QEMUIOVector *qiov,
916                                  int64_t size,
917                                  BlockCompletionFunc *cb,
918                                  void *opaque,
919                                  RBDAIOCmd cmd)
920 {
921     RBDAIOCB *acb;
922     RADOSCB *rcb = NULL;
923     rbd_completion_t c;
924     int r;
925 
926     BDRVRBDState *s = bs->opaque;
927 
928     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
929     acb->cmd = cmd;
930     acb->qiov = qiov;
931     assert(!qiov || qiov->size == size);
932 
933     rcb = g_new(RADOSCB, 1);
934 
935     if (!LIBRBD_USE_IOVEC) {
936         if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
937             acb->bounce = NULL;
938         } else {
939             acb->bounce = qemu_try_blockalign(bs, qiov->size);
940             if (acb->bounce == NULL) {
941                 goto failed;
942             }
943         }
944         if (cmd == RBD_AIO_WRITE) {
945             qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
946         }
947         rcb->buf = acb->bounce;
948     }
949 
950     acb->ret = 0;
951     acb->error = 0;
952     acb->s = s;
953 
954     rcb->acb = acb;
955     rcb->s = acb->s;
956     rcb->size = size;
957     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
958     if (r < 0) {
959         goto failed;
960     }
961 
962     switch (cmd) {
963     case RBD_AIO_WRITE: {
964         /*
965          * RBD APIs don't allow us to write more than actual size, so in order
966          * to support growing images, we resize the image before write
967          * operations that exceed the current size.
968          */
969         if (off + size > s->image_size) {
970             r = qemu_rbd_resize(bs, off + size);
971             if (r < 0) {
972                 goto failed_completion;
973             }
974         }
975 #ifdef LIBRBD_SUPPORTS_IOVEC
976             r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
977 #else
978             r = rbd_aio_write(s->image, off, size, rcb->buf, c);
979 #endif
980         break;
981     }
982     case RBD_AIO_READ:
983 #ifdef LIBRBD_SUPPORTS_IOVEC
984             r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
985 #else
986             r = rbd_aio_read(s->image, off, size, rcb->buf, c);
987 #endif
988         break;
989     case RBD_AIO_DISCARD:
990         r = rbd_aio_discard_wrapper(s->image, off, size, c);
991         break;
992     case RBD_AIO_FLUSH:
993         r = rbd_aio_flush_wrapper(s->image, c);
994         break;
995     default:
996         r = -EINVAL;
997     }
998 
999     if (r < 0) {
1000         goto failed_completion;
1001     }
1002     return &acb->common;
1003 
1004 failed_completion:
1005     rbd_aio_release(c);
1006 failed:
1007     g_free(rcb);
1008     if (!LIBRBD_USE_IOVEC) {
1009         qemu_vfree(acb->bounce);
1010     }
1011 
1012     qemu_aio_unref(acb);
1013     return NULL;
1014 }
1015 
1016 static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
1017                                        uint64_t offset, uint64_t bytes,
1018                                        QEMUIOVector *qiov, int flags,
1019                                        BlockCompletionFunc *cb,
1020                                        void *opaque)
1021 {
1022     return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1023                          RBD_AIO_READ);
1024 }
1025 
1026 static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
1027                                         uint64_t offset, uint64_t bytes,
1028                                         QEMUIOVector *qiov, int flags,
1029                                         BlockCompletionFunc *cb,
1030                                         void *opaque)
1031 {
1032     return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1033                          RBD_AIO_WRITE);
1034 }
1035 
1036 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1037 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
1038                                       BlockCompletionFunc *cb,
1039                                       void *opaque)
1040 {
1041     return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
1042 }
1043 
1044 #else
1045 
1046 static int qemu_rbd_co_flush(BlockDriverState *bs)
1047 {
1048 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
1049     /* rbd_flush added in 0.1.1 */
1050     BDRVRBDState *s = bs->opaque;
1051     return rbd_flush(s->image);
1052 #else
1053     return 0;
1054 #endif
1055 }
1056 #endif
1057 
1058 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1059 {
1060     BDRVRBDState *s = bs->opaque;
1061     rbd_image_info_t info;
1062     int r;
1063 
1064     r = rbd_stat(s->image, &info, sizeof(info));
1065     if (r < 0) {
1066         return r;
1067     }
1068 
1069     bdi->cluster_size = info.obj_size;
1070     return 0;
1071 }
1072 
1073 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1074 {
1075     BDRVRBDState *s = bs->opaque;
1076     rbd_image_info_t info;
1077     int r;
1078 
1079     r = rbd_stat(s->image, &info, sizeof(info));
1080     if (r < 0) {
1081         return r;
1082     }
1083 
1084     return info.size;
1085 }
1086 
1087 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1088                                              int64_t offset,
1089                                              PreallocMode prealloc,
1090                                              Error **errp)
1091 {
1092     int r;
1093 
1094     if (prealloc != PREALLOC_MODE_OFF) {
1095         error_setg(errp, "Unsupported preallocation mode '%s'",
1096                    PreallocMode_str(prealloc));
1097         return -ENOTSUP;
1098     }
1099 
1100     r = qemu_rbd_resize(bs, offset);
1101     if (r < 0) {
1102         error_setg_errno(errp, -r, "Failed to resize file");
1103         return r;
1104     }
1105 
1106     return 0;
1107 }
1108 
1109 static int qemu_rbd_snap_create(BlockDriverState *bs,
1110                                 QEMUSnapshotInfo *sn_info)
1111 {
1112     BDRVRBDState *s = bs->opaque;
1113     int r;
1114 
1115     if (sn_info->name[0] == '\0') {
1116         return -EINVAL; /* we need a name for rbd snapshots */
1117     }
1118 
1119     /*
1120      * rbd snapshots are using the name as the user controlled unique identifier
1121      * we can't use the rbd snapid for that purpose, as it can't be set
1122      */
1123     if (sn_info->id_str[0] != '\0' &&
1124         strcmp(sn_info->id_str, sn_info->name) != 0) {
1125         return -EINVAL;
1126     }
1127 
1128     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1129         return -ERANGE;
1130     }
1131 
1132     r = rbd_snap_create(s->image, sn_info->name);
1133     if (r < 0) {
1134         error_report("failed to create snap: %s", strerror(-r));
1135         return r;
1136     }
1137 
1138     return 0;
1139 }
1140 
1141 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1142                                 const char *snapshot_id,
1143                                 const char *snapshot_name,
1144                                 Error **errp)
1145 {
1146     BDRVRBDState *s = bs->opaque;
1147     int r;
1148 
1149     if (!snapshot_name) {
1150         error_setg(errp, "rbd need a valid snapshot name");
1151         return -EINVAL;
1152     }
1153 
1154     /* If snapshot_id is specified, it must be equal to name, see
1155        qemu_rbd_snap_list() */
1156     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1157         error_setg(errp,
1158                    "rbd do not support snapshot id, it should be NULL or "
1159                    "equal to snapshot name");
1160         return -EINVAL;
1161     }
1162 
1163     r = rbd_snap_remove(s->image, snapshot_name);
1164     if (r < 0) {
1165         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1166     }
1167     return r;
1168 }
1169 
1170 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1171                                   const char *snapshot_name)
1172 {
1173     BDRVRBDState *s = bs->opaque;
1174 
1175     return rbd_snap_rollback(s->image, snapshot_name);
1176 }
1177 
1178 static int qemu_rbd_snap_list(BlockDriverState *bs,
1179                               QEMUSnapshotInfo **psn_tab)
1180 {
1181     BDRVRBDState *s = bs->opaque;
1182     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1183     int i, snap_count;
1184     rbd_snap_info_t *snaps;
1185     int max_snaps = RBD_MAX_SNAPS;
1186 
1187     do {
1188         snaps = g_new(rbd_snap_info_t, max_snaps);
1189         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1190         if (snap_count <= 0) {
1191             g_free(snaps);
1192         }
1193     } while (snap_count == -ERANGE);
1194 
1195     if (snap_count <= 0) {
1196         goto done;
1197     }
1198 
1199     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1200 
1201     for (i = 0; i < snap_count; i++) {
1202         const char *snap_name = snaps[i].name;
1203 
1204         sn_info = sn_tab + i;
1205         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1206         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1207 
1208         sn_info->vm_state_size = snaps[i].size;
1209         sn_info->date_sec = 0;
1210         sn_info->date_nsec = 0;
1211         sn_info->vm_clock_nsec = 0;
1212     }
1213     rbd_snap_list_end(snaps);
1214     g_free(snaps);
1215 
1216  done:
1217     *psn_tab = sn_tab;
1218     return snap_count;
1219 }
1220 
1221 #ifdef LIBRBD_SUPPORTS_DISCARD
1222 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1223                                          int64_t offset,
1224                                          int bytes,
1225                                          BlockCompletionFunc *cb,
1226                                          void *opaque)
1227 {
1228     return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
1229                          RBD_AIO_DISCARD);
1230 }
1231 #endif
1232 
1233 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1234 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1235                                                       Error **errp)
1236 {
1237     BDRVRBDState *s = bs->opaque;
1238     int r = rbd_invalidate_cache(s->image);
1239     if (r < 0) {
1240         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1241     }
1242 }
1243 #endif
1244 
1245 static QemuOptsList qemu_rbd_create_opts = {
1246     .name = "rbd-create-opts",
1247     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1248     .desc = {
1249         {
1250             .name = BLOCK_OPT_SIZE,
1251             .type = QEMU_OPT_SIZE,
1252             .help = "Virtual disk size"
1253         },
1254         {
1255             .name = BLOCK_OPT_CLUSTER_SIZE,
1256             .type = QEMU_OPT_SIZE,
1257             .help = "RBD object size"
1258         },
1259         {
1260             .name = "password-secret",
1261             .type = QEMU_OPT_STRING,
1262             .help = "ID of secret providing the password",
1263         },
1264         { /* end of list */ }
1265     }
1266 };
1267 
1268 static const char *const qemu_rbd_strong_runtime_opts[] = {
1269     "pool",
1270     "image",
1271     "conf",
1272     "snapshot",
1273     "user",
1274     "server.",
1275     "password-secret",
1276 
1277     NULL
1278 };
1279 
1280 static BlockDriver bdrv_rbd = {
1281     .format_name            = "rbd",
1282     .instance_size          = sizeof(BDRVRBDState),
1283     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1284     .bdrv_refresh_limits    = qemu_rbd_refresh_limits,
1285     .bdrv_file_open         = qemu_rbd_open,
1286     .bdrv_close             = qemu_rbd_close,
1287     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1288     .bdrv_co_create         = qemu_rbd_co_create,
1289     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1290     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1291     .bdrv_has_zero_init_truncate = bdrv_has_zero_init_1,
1292     .bdrv_get_info          = qemu_rbd_getinfo,
1293     .create_opts            = &qemu_rbd_create_opts,
1294     .bdrv_getlength         = qemu_rbd_getlength,
1295     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1296     .protocol_name          = "rbd",
1297 
1298     .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
1299     .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
1300 
1301 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1302     .bdrv_aio_flush         = qemu_rbd_aio_flush,
1303 #else
1304     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1305 #endif
1306 
1307 #ifdef LIBRBD_SUPPORTS_DISCARD
1308     .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1309 #endif
1310 
1311     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1312     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1313     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1314     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1315 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1316     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1317 #endif
1318 
1319     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1320 };
1321 
1322 static void bdrv_rbd_init(void)
1323 {
1324     bdrv_register(&bdrv_rbd);
1325 }
1326 
1327 block_init(bdrv_rbd_init);
1328