xref: /qemu/block/rbd.c (revision 8063396b)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block_int.h"
22 #include "block/qdict.h"
23 #include "crypto/secret.h"
24 #include "qemu/cutils.h"
25 #include "sysemu/replay.h"
26 #include "qapi/qmp/qstring.h"
27 #include "qapi/qmp/qdict.h"
28 #include "qapi/qmp/qjson.h"
29 #include "qapi/qmp/qlist.h"
30 #include "qapi/qobject-input-visitor.h"
31 #include "qapi/qapi-visit-block-core.h"
32 
33 /*
34  * When specifying the image filename use:
35  *
36  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
37  *
38  * poolname must be the name of an existing rados pool.
39  *
40  * devicename is the name of the rbd image.
41  *
42  * Each option given is used to configure rados, and may be any valid
43  * Ceph option, "id", or "conf".
44  *
45  * The "id" option indicates what user we should authenticate as to
46  * the Ceph cluster.  If it is excluded we will use the Ceph default
47  * (normally 'admin').
48  *
49  * The "conf" option specifies a Ceph configuration file to read.  If
50  * it is not specified, we will read from the default Ceph locations
51  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
52  * file, specify conf=/dev/null.
53  *
54  * Configuration values containing :, @, or = can be escaped with a
55  * leading "\".
56  */
57 
58 /* rbd_aio_discard added in 0.1.2 */
59 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
60 #define LIBRBD_SUPPORTS_DISCARD
61 #else
62 #undef LIBRBD_SUPPORTS_DISCARD
63 #endif
64 
65 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
66 
67 #define RBD_MAX_SNAPS 100
68 
69 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
70 #ifdef LIBRBD_SUPPORTS_IOVEC
71 #define LIBRBD_USE_IOVEC 1
72 #else
73 #define LIBRBD_USE_IOVEC 0
74 #endif
75 
76 typedef enum {
77     RBD_AIO_READ,
78     RBD_AIO_WRITE,
79     RBD_AIO_DISCARD,
80     RBD_AIO_FLUSH
81 } RBDAIOCmd;
82 
83 typedef struct RBDAIOCB {
84     BlockAIOCB common;
85     int64_t ret;
86     QEMUIOVector *qiov;
87     char *bounce;
88     RBDAIOCmd cmd;
89     int error;
90     struct BDRVRBDState *s;
91 } RBDAIOCB;
92 
93 typedef struct RADOSCB {
94     RBDAIOCB *acb;
95     struct BDRVRBDState *s;
96     int64_t size;
97     char *buf;
98     int64_t ret;
99 } RADOSCB;
100 
101 typedef struct BDRVRBDState {
102     rados_t cluster;
103     rados_ioctx_t io_ctx;
104     rbd_image_t image;
105     char *image_name;
106     char *snap;
107     char *namespace;
108     uint64_t image_size;
109 } BDRVRBDState;
110 
111 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
112                             BlockdevOptionsRbd *opts, bool cache,
113                             const char *keypairs, const char *secretid,
114                             Error **errp);
115 
116 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
117 {
118     char *end;
119 
120     *p = NULL;
121 
122     for (end = src; *end; ++end) {
123         if (*end == delim) {
124             break;
125         }
126         if (*end == '\\' && end[1] != '\0') {
127             end++;
128         }
129     }
130     if (*end == delim) {
131         *p = end + 1;
132         *end = '\0';
133     }
134     return src;
135 }
136 
137 static void qemu_rbd_unescape(char *src)
138 {
139     char *p;
140 
141     for (p = src; *src; ++src, ++p) {
142         if (*src == '\\' && src[1] != '\0') {
143             src++;
144         }
145         *p = *src;
146     }
147     *p = '\0';
148 }
149 
150 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
151                                     Error **errp)
152 {
153     const char *start;
154     char *p, *buf;
155     QList *keypairs = NULL;
156     char *found_str, *image_name;
157 
158     if (!strstart(filename, "rbd:", &start)) {
159         error_setg(errp, "File name must start with 'rbd:'");
160         return;
161     }
162 
163     buf = g_strdup(start);
164     p = buf;
165 
166     found_str = qemu_rbd_next_tok(p, '/', &p);
167     if (!p) {
168         error_setg(errp, "Pool name is required");
169         goto done;
170     }
171     qemu_rbd_unescape(found_str);
172     qdict_put_str(options, "pool", found_str);
173 
174     if (strchr(p, '@')) {
175         image_name = qemu_rbd_next_tok(p, '@', &p);
176 
177         found_str = qemu_rbd_next_tok(p, ':', &p);
178         qemu_rbd_unescape(found_str);
179         qdict_put_str(options, "snapshot", found_str);
180     } else {
181         image_name = qemu_rbd_next_tok(p, ':', &p);
182     }
183     /* Check for namespace in the image_name */
184     if (strchr(image_name, '/')) {
185         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
186         qemu_rbd_unescape(found_str);
187         qdict_put_str(options, "namespace", found_str);
188     } else {
189         qdict_put_str(options, "namespace", "");
190     }
191     qemu_rbd_unescape(image_name);
192     qdict_put_str(options, "image", image_name);
193     if (!p) {
194         goto done;
195     }
196 
197     /* The following are essentially all key/value pairs, and we treat
198      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
199     while (p) {
200         char *name, *value;
201         name = qemu_rbd_next_tok(p, '=', &p);
202         if (!p) {
203             error_setg(errp, "conf option %s has no value", name);
204             break;
205         }
206 
207         qemu_rbd_unescape(name);
208 
209         value = qemu_rbd_next_tok(p, ':', &p);
210         qemu_rbd_unescape(value);
211 
212         if (!strcmp(name, "conf")) {
213             qdict_put_str(options, "conf", value);
214         } else if (!strcmp(name, "id")) {
215             qdict_put_str(options, "user", value);
216         } else {
217             /*
218              * We pass these internally to qemu_rbd_set_keypairs(), so
219              * we can get away with the simpler list of [ "key1",
220              * "value1", "key2", "value2" ] rather than a raw dict
221              * { "key1": "value1", "key2": "value2" } where we can't
222              * guarantee order, or even a more correct but complex
223              * [ { "key1": "value1" }, { "key2": "value2" } ]
224              */
225             if (!keypairs) {
226                 keypairs = qlist_new();
227             }
228             qlist_append_str(keypairs, name);
229             qlist_append_str(keypairs, value);
230         }
231     }
232 
233     if (keypairs) {
234         qdict_put(options, "=keyvalue-pairs",
235                   qobject_to_json(QOBJECT(keypairs)));
236     }
237 
238 done:
239     g_free(buf);
240     qobject_unref(keypairs);
241     return;
242 }
243 
244 
245 static void qemu_rbd_refresh_limits(BlockDriverState *bs, Error **errp)
246 {
247     /* XXX Does RBD support AIO on less than 512-byte alignment? */
248     bs->bl.request_alignment = 512;
249 }
250 
251 
252 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
253                              Error **errp)
254 {
255     char *key, *acr;
256     int r;
257     GString *accu;
258     RbdAuthModeList *auth;
259 
260     if (opts->key_secret) {
261         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
262         if (!key) {
263             return -EIO;
264         }
265         r = rados_conf_set(cluster, "key", key);
266         g_free(key);
267         if (r < 0) {
268             error_setg_errno(errp, -r, "Could not set 'key'");
269             return r;
270         }
271     }
272 
273     if (opts->has_auth_client_required) {
274         accu = g_string_new("");
275         for (auth = opts->auth_client_required; auth; auth = auth->next) {
276             if (accu->str[0]) {
277                 g_string_append_c(accu, ';');
278             }
279             g_string_append(accu, RbdAuthMode_str(auth->value));
280         }
281         acr = g_string_free(accu, FALSE);
282         r = rados_conf_set(cluster, "auth_client_required", acr);
283         g_free(acr);
284         if (r < 0) {
285             error_setg_errno(errp, -r,
286                              "Could not set 'auth_client_required'");
287             return r;
288         }
289     }
290 
291     return 0;
292 }
293 
294 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
295                                  Error **errp)
296 {
297     QList *keypairs;
298     QString *name;
299     QString *value;
300     const char *key;
301     size_t remaining;
302     int ret = 0;
303 
304     if (!keypairs_json) {
305         return ret;
306     }
307     keypairs = qobject_to(QList,
308                           qobject_from_json(keypairs_json, &error_abort));
309     remaining = qlist_size(keypairs) / 2;
310     assert(remaining);
311 
312     while (remaining--) {
313         name = qobject_to(QString, qlist_pop(keypairs));
314         value = qobject_to(QString, qlist_pop(keypairs));
315         assert(name && value);
316         key = qstring_get_str(name);
317 
318         ret = rados_conf_set(cluster, key, qstring_get_str(value));
319         qobject_unref(value);
320         if (ret < 0) {
321             error_setg_errno(errp, -ret, "invalid conf option %s", key);
322             qobject_unref(name);
323             ret = -EINVAL;
324             break;
325         }
326         qobject_unref(name);
327     }
328 
329     qobject_unref(keypairs);
330     return ret;
331 }
332 
333 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
334 {
335     if (LIBRBD_USE_IOVEC) {
336         RBDAIOCB *acb = rcb->acb;
337         iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
338                    acb->qiov->size - offs);
339     } else {
340         memset(rcb->buf + offs, 0, rcb->size - offs);
341     }
342 }
343 
344 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
345 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
346                               const char *keypairs, const char *password_secret,
347                               Error **errp)
348 {
349     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
350     rados_t cluster;
351     rados_ioctx_t io_ctx;
352     int obj_order = 0;
353     int ret;
354 
355     assert(options->driver == BLOCKDEV_DRIVER_RBD);
356     if (opts->location->has_snapshot) {
357         error_setg(errp, "Can't use snapshot name for image creation");
358         return -EINVAL;
359     }
360 
361     if (opts->has_cluster_size) {
362         int64_t objsize = opts->cluster_size;
363         if ((objsize - 1) & objsize) {    /* not a power of 2? */
364             error_setg(errp, "obj size needs to be power of 2");
365             return -EINVAL;
366         }
367         if (objsize < 4096) {
368             error_setg(errp, "obj size too small");
369             return -EINVAL;
370         }
371         obj_order = ctz32(objsize);
372     }
373 
374     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
375                            password_secret, errp);
376     if (ret < 0) {
377         return ret;
378     }
379 
380     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
381     if (ret < 0) {
382         error_setg_errno(errp, -ret, "error rbd create");
383         goto out;
384     }
385 
386     ret = 0;
387 out:
388     rados_ioctx_destroy(io_ctx);
389     rados_shutdown(cluster);
390     return ret;
391 }
392 
393 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
394 {
395     return qemu_rbd_do_create(options, NULL, NULL, errp);
396 }
397 
398 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
399                                                 const char *filename,
400                                                 QemuOpts *opts,
401                                                 Error **errp)
402 {
403     BlockdevCreateOptions *create_options;
404     BlockdevCreateOptionsRbd *rbd_opts;
405     BlockdevOptionsRbd *loc;
406     Error *local_err = NULL;
407     const char *keypairs, *password_secret;
408     QDict *options = NULL;
409     int ret = 0;
410 
411     create_options = g_new0(BlockdevCreateOptions, 1);
412     create_options->driver = BLOCKDEV_DRIVER_RBD;
413     rbd_opts = &create_options->u.rbd;
414 
415     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
416 
417     password_secret = qemu_opt_get(opts, "password-secret");
418 
419     /* Read out options */
420     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
421                               BDRV_SECTOR_SIZE);
422     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
423                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
424     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
425 
426     options = qdict_new();
427     qemu_rbd_parse_filename(filename, options, &local_err);
428     if (local_err) {
429         ret = -EINVAL;
430         error_propagate(errp, local_err);
431         goto exit;
432     }
433 
434     /*
435      * Caution: while qdict_get_try_str() is fine, getting non-string
436      * types would require more care.  When @options come from -blockdev
437      * or blockdev_add, its members are typed according to the QAPI
438      * schema, but when they come from -drive, they're all QString.
439      */
440     loc = rbd_opts->location;
441     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
442     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
443     loc->has_conf    = !!loc->conf;
444     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
445     loc->has_user    = !!loc->user;
446     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
447     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
448     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
449 
450     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
451     if (ret < 0) {
452         goto exit;
453     }
454 
455 exit:
456     qobject_unref(options);
457     qapi_free_BlockdevCreateOptions(create_options);
458     return ret;
459 }
460 
461 /*
462  * This aio completion is being called from rbd_finish_bh() and runs in qemu
463  * BH context.
464  */
465 static void qemu_rbd_complete_aio(RADOSCB *rcb)
466 {
467     RBDAIOCB *acb = rcb->acb;
468     int64_t r;
469 
470     r = rcb->ret;
471 
472     if (acb->cmd != RBD_AIO_READ) {
473         if (r < 0) {
474             acb->ret = r;
475             acb->error = 1;
476         } else if (!acb->error) {
477             acb->ret = rcb->size;
478         }
479     } else {
480         if (r < 0) {
481             qemu_rbd_memset(rcb, 0);
482             acb->ret = r;
483             acb->error = 1;
484         } else if (r < rcb->size) {
485             qemu_rbd_memset(rcb, r);
486             if (!acb->error) {
487                 acb->ret = rcb->size;
488             }
489         } else if (!acb->error) {
490             acb->ret = r;
491         }
492     }
493 
494     g_free(rcb);
495 
496     if (!LIBRBD_USE_IOVEC) {
497         if (acb->cmd == RBD_AIO_READ) {
498             qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
499         }
500         qemu_vfree(acb->bounce);
501     }
502 
503     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
504 
505     qemu_aio_unref(acb);
506 }
507 
508 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
509 {
510     const char **vals;
511     const char *host, *port;
512     char *rados_str;
513     InetSocketAddressBaseList *p;
514     int i, cnt;
515 
516     if (!opts->has_server) {
517         return NULL;
518     }
519 
520     for (cnt = 0, p = opts->server; p; p = p->next) {
521         cnt++;
522     }
523 
524     vals = g_new(const char *, cnt + 1);
525 
526     for (i = 0, p = opts->server; p; p = p->next, i++) {
527         host = p->value->host;
528         port = p->value->port;
529 
530         if (strchr(host, ':')) {
531             vals[i] = g_strdup_printf("[%s]:%s", host, port);
532         } else {
533             vals[i] = g_strdup_printf("%s:%s", host, port);
534         }
535     }
536     vals[i] = NULL;
537 
538     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
539     g_strfreev((char **)vals);
540     return rados_str;
541 }
542 
543 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
544                             BlockdevOptionsRbd *opts, bool cache,
545                             const char *keypairs, const char *secretid,
546                             Error **errp)
547 {
548     char *mon_host = NULL;
549     Error *local_err = NULL;
550     int r;
551 
552     if (secretid) {
553         if (opts->key_secret) {
554             error_setg(errp,
555                        "Legacy 'password-secret' clashes with 'key-secret'");
556             return -EINVAL;
557         }
558         opts->key_secret = g_strdup(secretid);
559         opts->has_key_secret = true;
560     }
561 
562     mon_host = qemu_rbd_mon_host(opts, &local_err);
563     if (local_err) {
564         error_propagate(errp, local_err);
565         r = -EINVAL;
566         goto failed_opts;
567     }
568 
569     r = rados_create(cluster, opts->user);
570     if (r < 0) {
571         error_setg_errno(errp, -r, "error initializing");
572         goto failed_opts;
573     }
574 
575     /* try default location when conf=NULL, but ignore failure */
576     r = rados_conf_read_file(*cluster, opts->conf);
577     if (opts->has_conf && r < 0) {
578         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
579         goto failed_shutdown;
580     }
581 
582     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
583     if (r < 0) {
584         goto failed_shutdown;
585     }
586 
587     if (mon_host) {
588         r = rados_conf_set(*cluster, "mon_host", mon_host);
589         if (r < 0) {
590             goto failed_shutdown;
591         }
592     }
593 
594     r = qemu_rbd_set_auth(*cluster, opts, errp);
595     if (r < 0) {
596         goto failed_shutdown;
597     }
598 
599     /*
600      * Fallback to more conservative semantics if setting cache
601      * options fails. Ignore errors from setting rbd_cache because the
602      * only possible error is that the option does not exist, and
603      * librbd defaults to no caching. If write through caching cannot
604      * be set up, fall back to no caching.
605      */
606     if (cache) {
607         rados_conf_set(*cluster, "rbd_cache", "true");
608     } else {
609         rados_conf_set(*cluster, "rbd_cache", "false");
610     }
611 
612     r = rados_connect(*cluster);
613     if (r < 0) {
614         error_setg_errno(errp, -r, "error connecting");
615         goto failed_shutdown;
616     }
617 
618     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
619     if (r < 0) {
620         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
621         goto failed_shutdown;
622     }
623     /*
624      * Set the namespace after opening the io context on the pool,
625      * if nspace == NULL or if nspace == "", it is just as we did nothing
626      */
627     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
628 
629     return 0;
630 
631 failed_shutdown:
632     rados_shutdown(*cluster);
633 failed_opts:
634     g_free(mon_host);
635     return r;
636 }
637 
638 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
639                                     Error **errp)
640 {
641     Visitor *v;
642 
643     /* Convert the remaining options into a QAPI object */
644     v = qobject_input_visitor_new_flat_confused(options, errp);
645     if (!v) {
646         return -EINVAL;
647     }
648 
649     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
650     visit_free(v);
651     if (!opts) {
652         return -EINVAL;
653     }
654 
655     return 0;
656 }
657 
658 static int qemu_rbd_attempt_legacy_options(QDict *options,
659                                            BlockdevOptionsRbd **opts,
660                                            char **keypairs)
661 {
662     char *filename;
663     int r;
664 
665     filename = g_strdup(qdict_get_try_str(options, "filename"));
666     if (!filename) {
667         return -EINVAL;
668     }
669     qdict_del(options, "filename");
670 
671     qemu_rbd_parse_filename(filename, options, NULL);
672 
673     /* keypairs freed by caller */
674     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
675     if (*keypairs) {
676         qdict_del(options, "=keyvalue-pairs");
677     }
678 
679     r = qemu_rbd_convert_options(options, opts, NULL);
680 
681     g_free(filename);
682     return r;
683 }
684 
685 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
686                          Error **errp)
687 {
688     BDRVRBDState *s = bs->opaque;
689     BlockdevOptionsRbd *opts = NULL;
690     const QDictEntry *e;
691     Error *local_err = NULL;
692     char *keypairs, *secretid;
693     int r;
694 
695     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
696     if (keypairs) {
697         qdict_del(options, "=keyvalue-pairs");
698     }
699 
700     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
701     if (secretid) {
702         qdict_del(options, "password-secret");
703     }
704 
705     r = qemu_rbd_convert_options(options, &opts, &local_err);
706     if (local_err) {
707         /* If keypairs are present, that means some options are present in
708          * the modern option format.  Don't attempt to parse legacy option
709          * formats, as we won't support mixed usage. */
710         if (keypairs) {
711             error_propagate(errp, local_err);
712             goto out;
713         }
714 
715         /* If the initial attempt to convert and process the options failed,
716          * we may be attempting to open an image file that has the rbd options
717          * specified in the older format consisting of all key/value pairs
718          * encoded in the filename.  Go ahead and attempt to parse the
719          * filename, and see if we can pull out the required options. */
720         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
721         if (r < 0) {
722             /* Propagate the original error, not the legacy parsing fallback
723              * error, as the latter was just a best-effort attempt. */
724             error_propagate(errp, local_err);
725             goto out;
726         }
727         /* Take care whenever deciding to actually deprecate; once this ability
728          * is removed, we will not be able to open any images with legacy-styled
729          * backing image strings. */
730         warn_report("RBD options encoded in the filename as keyvalue pairs "
731                     "is deprecated");
732     }
733 
734     /* Remove the processed options from the QDict (the visitor processes
735      * _all_ options in the QDict) */
736     while ((e = qdict_first(options))) {
737         qdict_del(options, e->key);
738     }
739 
740     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
741                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
742     if (r < 0) {
743         goto out;
744     }
745 
746     s->snap = g_strdup(opts->snapshot);
747     s->image_name = g_strdup(opts->image);
748 
749     /* rbd_open is always r/w */
750     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
751     if (r < 0) {
752         error_setg_errno(errp, -r, "error reading header from %s",
753                          s->image_name);
754         goto failed_open;
755     }
756 
757     r = rbd_get_size(s->image, &s->image_size);
758     if (r < 0) {
759         error_setg_errno(errp, -r, "error getting image size from %s",
760                          s->image_name);
761         rbd_close(s->image);
762         goto failed_open;
763     }
764 
765     /* If we are using an rbd snapshot, we must be r/o, otherwise
766      * leave as-is */
767     if (s->snap != NULL) {
768         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
769         if (r < 0) {
770             rbd_close(s->image);
771             goto failed_open;
772         }
773     }
774 
775     /* When extending regular files, we get zeros from the OS */
776     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
777 
778     r = 0;
779     goto out;
780 
781 failed_open:
782     rados_ioctx_destroy(s->io_ctx);
783     g_free(s->snap);
784     g_free(s->image_name);
785     rados_shutdown(s->cluster);
786 out:
787     qapi_free_BlockdevOptionsRbd(opts);
788     g_free(keypairs);
789     g_free(secretid);
790     return r;
791 }
792 
793 
794 /* Since RBD is currently always opened R/W via the API,
795  * we just need to check if we are using a snapshot or not, in
796  * order to determine if we will allow it to be R/W */
797 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
798                                    BlockReopenQueue *queue, Error **errp)
799 {
800     BDRVRBDState *s = state->bs->opaque;
801     int ret = 0;
802 
803     if (s->snap && state->flags & BDRV_O_RDWR) {
804         error_setg(errp,
805                    "Cannot change node '%s' to r/w when using RBD snapshot",
806                    bdrv_get_device_or_node_name(state->bs));
807         ret = -EINVAL;
808     }
809 
810     return ret;
811 }
812 
813 static void qemu_rbd_close(BlockDriverState *bs)
814 {
815     BDRVRBDState *s = bs->opaque;
816 
817     rbd_close(s->image);
818     rados_ioctx_destroy(s->io_ctx);
819     g_free(s->snap);
820     g_free(s->image_name);
821     rados_shutdown(s->cluster);
822 }
823 
824 /* Resize the RBD image and update the 'image_size' with the current size */
825 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
826 {
827     BDRVRBDState *s = bs->opaque;
828     int r;
829 
830     r = rbd_resize(s->image, size);
831     if (r < 0) {
832         return r;
833     }
834 
835     s->image_size = size;
836 
837     return 0;
838 }
839 
840 static const AIOCBInfo rbd_aiocb_info = {
841     .aiocb_size = sizeof(RBDAIOCB),
842 };
843 
844 static void rbd_finish_bh(void *opaque)
845 {
846     RADOSCB *rcb = opaque;
847     qemu_rbd_complete_aio(rcb);
848 }
849 
850 /*
851  * This is the callback function for rbd_aio_read and _write
852  *
853  * Note: this function is being called from a non qemu thread so
854  * we need to be careful about what we do here. Generally we only
855  * schedule a BH, and do the rest of the io completion handling
856  * from rbd_finish_bh() which runs in a qemu context.
857  */
858 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
859 {
860     RBDAIOCB *acb = rcb->acb;
861 
862     rcb->ret = rbd_aio_get_return_value(c);
863     rbd_aio_release(c);
864 
865     replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
866                                      rbd_finish_bh, rcb);
867 }
868 
869 static int rbd_aio_discard_wrapper(rbd_image_t image,
870                                    uint64_t off,
871                                    uint64_t len,
872                                    rbd_completion_t comp)
873 {
874 #ifdef LIBRBD_SUPPORTS_DISCARD
875     return rbd_aio_discard(image, off, len, comp);
876 #else
877     return -ENOTSUP;
878 #endif
879 }
880 
881 static int rbd_aio_flush_wrapper(rbd_image_t image,
882                                  rbd_completion_t comp)
883 {
884 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
885     return rbd_aio_flush(image, comp);
886 #else
887     return -ENOTSUP;
888 #endif
889 }
890 
891 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
892                                  int64_t off,
893                                  QEMUIOVector *qiov,
894                                  int64_t size,
895                                  BlockCompletionFunc *cb,
896                                  void *opaque,
897                                  RBDAIOCmd cmd)
898 {
899     RBDAIOCB *acb;
900     RADOSCB *rcb = NULL;
901     rbd_completion_t c;
902     int r;
903 
904     BDRVRBDState *s = bs->opaque;
905 
906     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
907     acb->cmd = cmd;
908     acb->qiov = qiov;
909     assert(!qiov || qiov->size == size);
910 
911     rcb = g_new(RADOSCB, 1);
912 
913     if (!LIBRBD_USE_IOVEC) {
914         if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
915             acb->bounce = NULL;
916         } else {
917             acb->bounce = qemu_try_blockalign(bs, qiov->size);
918             if (acb->bounce == NULL) {
919                 goto failed;
920             }
921         }
922         if (cmd == RBD_AIO_WRITE) {
923             qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
924         }
925         rcb->buf = acb->bounce;
926     }
927 
928     acb->ret = 0;
929     acb->error = 0;
930     acb->s = s;
931 
932     rcb->acb = acb;
933     rcb->s = acb->s;
934     rcb->size = size;
935     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
936     if (r < 0) {
937         goto failed;
938     }
939 
940     switch (cmd) {
941     case RBD_AIO_WRITE: {
942         /*
943          * RBD APIs don't allow us to write more than actual size, so in order
944          * to support growing images, we resize the image before write
945          * operations that exceed the current size.
946          */
947         if (off + size > s->image_size) {
948             r = qemu_rbd_resize(bs, off + size);
949             if (r < 0) {
950                 goto failed_completion;
951             }
952         }
953 #ifdef LIBRBD_SUPPORTS_IOVEC
954             r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
955 #else
956             r = rbd_aio_write(s->image, off, size, rcb->buf, c);
957 #endif
958         break;
959     }
960     case RBD_AIO_READ:
961 #ifdef LIBRBD_SUPPORTS_IOVEC
962             r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
963 #else
964             r = rbd_aio_read(s->image, off, size, rcb->buf, c);
965 #endif
966         break;
967     case RBD_AIO_DISCARD:
968         r = rbd_aio_discard_wrapper(s->image, off, size, c);
969         break;
970     case RBD_AIO_FLUSH:
971         r = rbd_aio_flush_wrapper(s->image, c);
972         break;
973     default:
974         r = -EINVAL;
975     }
976 
977     if (r < 0) {
978         goto failed_completion;
979     }
980     return &acb->common;
981 
982 failed_completion:
983     rbd_aio_release(c);
984 failed:
985     g_free(rcb);
986     if (!LIBRBD_USE_IOVEC) {
987         qemu_vfree(acb->bounce);
988     }
989 
990     qemu_aio_unref(acb);
991     return NULL;
992 }
993 
994 static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
995                                        uint64_t offset, uint64_t bytes,
996                                        QEMUIOVector *qiov, int flags,
997                                        BlockCompletionFunc *cb,
998                                        void *opaque)
999 {
1000     return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1001                          RBD_AIO_READ);
1002 }
1003 
1004 static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
1005                                         uint64_t offset, uint64_t bytes,
1006                                         QEMUIOVector *qiov, int flags,
1007                                         BlockCompletionFunc *cb,
1008                                         void *opaque)
1009 {
1010     return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1011                          RBD_AIO_WRITE);
1012 }
1013 
1014 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1015 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
1016                                       BlockCompletionFunc *cb,
1017                                       void *opaque)
1018 {
1019     return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
1020 }
1021 
1022 #else
1023 
1024 static int qemu_rbd_co_flush(BlockDriverState *bs)
1025 {
1026 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
1027     /* rbd_flush added in 0.1.1 */
1028     BDRVRBDState *s = bs->opaque;
1029     return rbd_flush(s->image);
1030 #else
1031     return 0;
1032 #endif
1033 }
1034 #endif
1035 
1036 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1037 {
1038     BDRVRBDState *s = bs->opaque;
1039     rbd_image_info_t info;
1040     int r;
1041 
1042     r = rbd_stat(s->image, &info, sizeof(info));
1043     if (r < 0) {
1044         return r;
1045     }
1046 
1047     bdi->cluster_size = info.obj_size;
1048     return 0;
1049 }
1050 
1051 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1052 {
1053     BDRVRBDState *s = bs->opaque;
1054     rbd_image_info_t info;
1055     int r;
1056 
1057     r = rbd_stat(s->image, &info, sizeof(info));
1058     if (r < 0) {
1059         return r;
1060     }
1061 
1062     return info.size;
1063 }
1064 
1065 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1066                                              int64_t offset,
1067                                              bool exact,
1068                                              PreallocMode prealloc,
1069                                              BdrvRequestFlags flags,
1070                                              Error **errp)
1071 {
1072     int r;
1073 
1074     if (prealloc != PREALLOC_MODE_OFF) {
1075         error_setg(errp, "Unsupported preallocation mode '%s'",
1076                    PreallocMode_str(prealloc));
1077         return -ENOTSUP;
1078     }
1079 
1080     r = qemu_rbd_resize(bs, offset);
1081     if (r < 0) {
1082         error_setg_errno(errp, -r, "Failed to resize file");
1083         return r;
1084     }
1085 
1086     return 0;
1087 }
1088 
1089 static int qemu_rbd_snap_create(BlockDriverState *bs,
1090                                 QEMUSnapshotInfo *sn_info)
1091 {
1092     BDRVRBDState *s = bs->opaque;
1093     int r;
1094 
1095     if (sn_info->name[0] == '\0') {
1096         return -EINVAL; /* we need a name for rbd snapshots */
1097     }
1098 
1099     /*
1100      * rbd snapshots are using the name as the user controlled unique identifier
1101      * we can't use the rbd snapid for that purpose, as it can't be set
1102      */
1103     if (sn_info->id_str[0] != '\0' &&
1104         strcmp(sn_info->id_str, sn_info->name) != 0) {
1105         return -EINVAL;
1106     }
1107 
1108     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1109         return -ERANGE;
1110     }
1111 
1112     r = rbd_snap_create(s->image, sn_info->name);
1113     if (r < 0) {
1114         error_report("failed to create snap: %s", strerror(-r));
1115         return r;
1116     }
1117 
1118     return 0;
1119 }
1120 
1121 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1122                                 const char *snapshot_id,
1123                                 const char *snapshot_name,
1124                                 Error **errp)
1125 {
1126     BDRVRBDState *s = bs->opaque;
1127     int r;
1128 
1129     if (!snapshot_name) {
1130         error_setg(errp, "rbd need a valid snapshot name");
1131         return -EINVAL;
1132     }
1133 
1134     /* If snapshot_id is specified, it must be equal to name, see
1135        qemu_rbd_snap_list() */
1136     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1137         error_setg(errp,
1138                    "rbd do not support snapshot id, it should be NULL or "
1139                    "equal to snapshot name");
1140         return -EINVAL;
1141     }
1142 
1143     r = rbd_snap_remove(s->image, snapshot_name);
1144     if (r < 0) {
1145         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1146     }
1147     return r;
1148 }
1149 
1150 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1151                                   const char *snapshot_name)
1152 {
1153     BDRVRBDState *s = bs->opaque;
1154 
1155     return rbd_snap_rollback(s->image, snapshot_name);
1156 }
1157 
1158 static int qemu_rbd_snap_list(BlockDriverState *bs,
1159                               QEMUSnapshotInfo **psn_tab)
1160 {
1161     BDRVRBDState *s = bs->opaque;
1162     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1163     int i, snap_count;
1164     rbd_snap_info_t *snaps;
1165     int max_snaps = RBD_MAX_SNAPS;
1166 
1167     do {
1168         snaps = g_new(rbd_snap_info_t, max_snaps);
1169         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1170         if (snap_count <= 0) {
1171             g_free(snaps);
1172         }
1173     } while (snap_count == -ERANGE);
1174 
1175     if (snap_count <= 0) {
1176         goto done;
1177     }
1178 
1179     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1180 
1181     for (i = 0; i < snap_count; i++) {
1182         const char *snap_name = snaps[i].name;
1183 
1184         sn_info = sn_tab + i;
1185         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1186         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1187 
1188         sn_info->vm_state_size = snaps[i].size;
1189         sn_info->date_sec = 0;
1190         sn_info->date_nsec = 0;
1191         sn_info->vm_clock_nsec = 0;
1192     }
1193     rbd_snap_list_end(snaps);
1194     g_free(snaps);
1195 
1196  done:
1197     *psn_tab = sn_tab;
1198     return snap_count;
1199 }
1200 
1201 #ifdef LIBRBD_SUPPORTS_DISCARD
1202 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1203                                          int64_t offset,
1204                                          int bytes,
1205                                          BlockCompletionFunc *cb,
1206                                          void *opaque)
1207 {
1208     return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
1209                          RBD_AIO_DISCARD);
1210 }
1211 #endif
1212 
1213 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1214 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1215                                                       Error **errp)
1216 {
1217     BDRVRBDState *s = bs->opaque;
1218     int r = rbd_invalidate_cache(s->image);
1219     if (r < 0) {
1220         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1221     }
1222 }
1223 #endif
1224 
1225 static QemuOptsList qemu_rbd_create_opts = {
1226     .name = "rbd-create-opts",
1227     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1228     .desc = {
1229         {
1230             .name = BLOCK_OPT_SIZE,
1231             .type = QEMU_OPT_SIZE,
1232             .help = "Virtual disk size"
1233         },
1234         {
1235             .name = BLOCK_OPT_CLUSTER_SIZE,
1236             .type = QEMU_OPT_SIZE,
1237             .help = "RBD object size"
1238         },
1239         {
1240             .name = "password-secret",
1241             .type = QEMU_OPT_STRING,
1242             .help = "ID of secret providing the password",
1243         },
1244         { /* end of list */ }
1245     }
1246 };
1247 
1248 static const char *const qemu_rbd_strong_runtime_opts[] = {
1249     "pool",
1250     "namespace",
1251     "image",
1252     "conf",
1253     "snapshot",
1254     "user",
1255     "server.",
1256     "password-secret",
1257 
1258     NULL
1259 };
1260 
1261 static BlockDriver bdrv_rbd = {
1262     .format_name            = "rbd",
1263     .instance_size          = sizeof(BDRVRBDState),
1264     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1265     .bdrv_refresh_limits    = qemu_rbd_refresh_limits,
1266     .bdrv_file_open         = qemu_rbd_open,
1267     .bdrv_close             = qemu_rbd_close,
1268     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1269     .bdrv_co_create         = qemu_rbd_co_create,
1270     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1271     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1272     .bdrv_get_info          = qemu_rbd_getinfo,
1273     .create_opts            = &qemu_rbd_create_opts,
1274     .bdrv_getlength         = qemu_rbd_getlength,
1275     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1276     .protocol_name          = "rbd",
1277 
1278     .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
1279     .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
1280 
1281 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1282     .bdrv_aio_flush         = qemu_rbd_aio_flush,
1283 #else
1284     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1285 #endif
1286 
1287 #ifdef LIBRBD_SUPPORTS_DISCARD
1288     .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1289 #endif
1290 
1291     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1292     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1293     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1294     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1295 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1296     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1297 #endif
1298 
1299     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1300 };
1301 
1302 static void bdrv_rbd_init(void)
1303 {
1304     bdrv_register(&bdrv_rbd);
1305 }
1306 
1307 block_init(bdrv_rbd_init);
1308