xref: /qemu/block/rbd.c (revision d0fb9657)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block_int.h"
22 #include "block/qdict.h"
23 #include "crypto/secret.h"
24 #include "qemu/cutils.h"
25 #include "sysemu/replay.h"
26 #include "qapi/qmp/qstring.h"
27 #include "qapi/qmp/qdict.h"
28 #include "qapi/qmp/qjson.h"
29 #include "qapi/qmp/qlist.h"
30 #include "qapi/qobject-input-visitor.h"
31 #include "qapi/qapi-visit-block-core.h"
32 
33 /*
34  * When specifying the image filename use:
35  *
36  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
37  *
38  * poolname must be the name of an existing rados pool.
39  *
40  * devicename is the name of the rbd image.
41  *
42  * Each option given is used to configure rados, and may be any valid
43  * Ceph option, "id", or "conf".
44  *
45  * The "id" option indicates what user we should authenticate as to
46  * the Ceph cluster.  If it is excluded we will use the Ceph default
47  * (normally 'admin').
48  *
49  * The "conf" option specifies a Ceph configuration file to read.  If
50  * it is not specified, we will read from the default Ceph locations
51  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
52  * file, specify conf=/dev/null.
53  *
54  * Configuration values containing :, @, or = can be escaped with a
55  * leading "\".
56  */
57 
58 /* rbd_aio_discard added in 0.1.2 */
59 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
60 #define LIBRBD_SUPPORTS_DISCARD
61 #else
62 #undef LIBRBD_SUPPORTS_DISCARD
63 #endif
64 
65 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
66 
67 #define RBD_MAX_SNAPS 100
68 
69 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
70 #ifdef LIBRBD_SUPPORTS_IOVEC
71 #define LIBRBD_USE_IOVEC 1
72 #else
73 #define LIBRBD_USE_IOVEC 0
74 #endif
75 
76 typedef enum {
77     RBD_AIO_READ,
78     RBD_AIO_WRITE,
79     RBD_AIO_DISCARD,
80     RBD_AIO_FLUSH
81 } RBDAIOCmd;
82 
83 typedef struct RBDAIOCB {
84     BlockAIOCB common;
85     int64_t ret;
86     QEMUIOVector *qiov;
87     char *bounce;
88     RBDAIOCmd cmd;
89     int error;
90     struct BDRVRBDState *s;
91 } RBDAIOCB;
92 
93 typedef struct RADOSCB {
94     RBDAIOCB *acb;
95     struct BDRVRBDState *s;
96     int64_t size;
97     char *buf;
98     int64_t ret;
99 } RADOSCB;
100 
101 typedef struct BDRVRBDState {
102     rados_t cluster;
103     rados_ioctx_t io_ctx;
104     rbd_image_t image;
105     char *image_name;
106     char *snap;
107     char *namespace;
108     uint64_t image_size;
109 } BDRVRBDState;
110 
111 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
112                             BlockdevOptionsRbd *opts, bool cache,
113                             const char *keypairs, const char *secretid,
114                             Error **errp);
115 
116 static char *qemu_rbd_strchr(char *src, char delim)
117 {
118     char *p;
119 
120     for (p = src; *p; ++p) {
121         if (*p == delim) {
122             return p;
123         }
124         if (*p == '\\' && p[1] != '\0') {
125             ++p;
126         }
127     }
128 
129     return NULL;
130 }
131 
132 
133 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
134 {
135     char *end;
136 
137     *p = NULL;
138 
139     end = qemu_rbd_strchr(src, delim);
140     if (end) {
141         *p = end + 1;
142         *end = '\0';
143     }
144     return src;
145 }
146 
147 static void qemu_rbd_unescape(char *src)
148 {
149     char *p;
150 
151     for (p = src; *src; ++src, ++p) {
152         if (*src == '\\' && src[1] != '\0') {
153             src++;
154         }
155         *p = *src;
156     }
157     *p = '\0';
158 }
159 
160 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
161                                     Error **errp)
162 {
163     const char *start;
164     char *p, *buf;
165     QList *keypairs = NULL;
166     char *found_str, *image_name;
167 
168     if (!strstart(filename, "rbd:", &start)) {
169         error_setg(errp, "File name must start with 'rbd:'");
170         return;
171     }
172 
173     buf = g_strdup(start);
174     p = buf;
175 
176     found_str = qemu_rbd_next_tok(p, '/', &p);
177     if (!p) {
178         error_setg(errp, "Pool name is required");
179         goto done;
180     }
181     qemu_rbd_unescape(found_str);
182     qdict_put_str(options, "pool", found_str);
183 
184     if (qemu_rbd_strchr(p, '@')) {
185         image_name = qemu_rbd_next_tok(p, '@', &p);
186 
187         found_str = qemu_rbd_next_tok(p, ':', &p);
188         qemu_rbd_unescape(found_str);
189         qdict_put_str(options, "snapshot", found_str);
190     } else {
191         image_name = qemu_rbd_next_tok(p, ':', &p);
192     }
193     /* Check for namespace in the image_name */
194     if (qemu_rbd_strchr(image_name, '/')) {
195         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
196         qemu_rbd_unescape(found_str);
197         qdict_put_str(options, "namespace", found_str);
198     } else {
199         qdict_put_str(options, "namespace", "");
200     }
201     qemu_rbd_unescape(image_name);
202     qdict_put_str(options, "image", image_name);
203     if (!p) {
204         goto done;
205     }
206 
207     /* The following are essentially all key/value pairs, and we treat
208      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
209     while (p) {
210         char *name, *value;
211         name = qemu_rbd_next_tok(p, '=', &p);
212         if (!p) {
213             error_setg(errp, "conf option %s has no value", name);
214             break;
215         }
216 
217         qemu_rbd_unescape(name);
218 
219         value = qemu_rbd_next_tok(p, ':', &p);
220         qemu_rbd_unescape(value);
221 
222         if (!strcmp(name, "conf")) {
223             qdict_put_str(options, "conf", value);
224         } else if (!strcmp(name, "id")) {
225             qdict_put_str(options, "user", value);
226         } else {
227             /*
228              * We pass these internally to qemu_rbd_set_keypairs(), so
229              * we can get away with the simpler list of [ "key1",
230              * "value1", "key2", "value2" ] rather than a raw dict
231              * { "key1": "value1", "key2": "value2" } where we can't
232              * guarantee order, or even a more correct but complex
233              * [ { "key1": "value1" }, { "key2": "value2" } ]
234              */
235             if (!keypairs) {
236                 keypairs = qlist_new();
237             }
238             qlist_append_str(keypairs, name);
239             qlist_append_str(keypairs, value);
240         }
241     }
242 
243     if (keypairs) {
244         qdict_put(options, "=keyvalue-pairs",
245                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
246     }
247 
248 done:
249     g_free(buf);
250     qobject_unref(keypairs);
251     return;
252 }
253 
254 
255 static void qemu_rbd_refresh_limits(BlockDriverState *bs, Error **errp)
256 {
257     /* XXX Does RBD support AIO on less than 512-byte alignment? */
258     bs->bl.request_alignment = 512;
259 }
260 
261 
262 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
263                              Error **errp)
264 {
265     char *key, *acr;
266     int r;
267     GString *accu;
268     RbdAuthModeList *auth;
269 
270     if (opts->key_secret) {
271         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
272         if (!key) {
273             return -EIO;
274         }
275         r = rados_conf_set(cluster, "key", key);
276         g_free(key);
277         if (r < 0) {
278             error_setg_errno(errp, -r, "Could not set 'key'");
279             return r;
280         }
281     }
282 
283     if (opts->has_auth_client_required) {
284         accu = g_string_new("");
285         for (auth = opts->auth_client_required; auth; auth = auth->next) {
286             if (accu->str[0]) {
287                 g_string_append_c(accu, ';');
288             }
289             g_string_append(accu, RbdAuthMode_str(auth->value));
290         }
291         acr = g_string_free(accu, FALSE);
292         r = rados_conf_set(cluster, "auth_client_required", acr);
293         g_free(acr);
294         if (r < 0) {
295             error_setg_errno(errp, -r,
296                              "Could not set 'auth_client_required'");
297             return r;
298         }
299     }
300 
301     return 0;
302 }
303 
304 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
305                                  Error **errp)
306 {
307     QList *keypairs;
308     QString *name;
309     QString *value;
310     const char *key;
311     size_t remaining;
312     int ret = 0;
313 
314     if (!keypairs_json) {
315         return ret;
316     }
317     keypairs = qobject_to(QList,
318                           qobject_from_json(keypairs_json, &error_abort));
319     remaining = qlist_size(keypairs) / 2;
320     assert(remaining);
321 
322     while (remaining--) {
323         name = qobject_to(QString, qlist_pop(keypairs));
324         value = qobject_to(QString, qlist_pop(keypairs));
325         assert(name && value);
326         key = qstring_get_str(name);
327 
328         ret = rados_conf_set(cluster, key, qstring_get_str(value));
329         qobject_unref(value);
330         if (ret < 0) {
331             error_setg_errno(errp, -ret, "invalid conf option %s", key);
332             qobject_unref(name);
333             ret = -EINVAL;
334             break;
335         }
336         qobject_unref(name);
337     }
338 
339     qobject_unref(keypairs);
340     return ret;
341 }
342 
343 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
344 {
345     if (LIBRBD_USE_IOVEC) {
346         RBDAIOCB *acb = rcb->acb;
347         iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
348                    acb->qiov->size - offs);
349     } else {
350         memset(rcb->buf + offs, 0, rcb->size - offs);
351     }
352 }
353 
354 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
355 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
356                               const char *keypairs, const char *password_secret,
357                               Error **errp)
358 {
359     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
360     rados_t cluster;
361     rados_ioctx_t io_ctx;
362     int obj_order = 0;
363     int ret;
364 
365     assert(options->driver == BLOCKDEV_DRIVER_RBD);
366     if (opts->location->has_snapshot) {
367         error_setg(errp, "Can't use snapshot name for image creation");
368         return -EINVAL;
369     }
370 
371     if (opts->has_cluster_size) {
372         int64_t objsize = opts->cluster_size;
373         if ((objsize - 1) & objsize) {    /* not a power of 2? */
374             error_setg(errp, "obj size needs to be power of 2");
375             return -EINVAL;
376         }
377         if (objsize < 4096) {
378             error_setg(errp, "obj size too small");
379             return -EINVAL;
380         }
381         obj_order = ctz32(objsize);
382     }
383 
384     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
385                            password_secret, errp);
386     if (ret < 0) {
387         return ret;
388     }
389 
390     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
391     if (ret < 0) {
392         error_setg_errno(errp, -ret, "error rbd create");
393         goto out;
394     }
395 
396     ret = 0;
397 out:
398     rados_ioctx_destroy(io_ctx);
399     rados_shutdown(cluster);
400     return ret;
401 }
402 
403 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
404 {
405     return qemu_rbd_do_create(options, NULL, NULL, errp);
406 }
407 
408 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
409                                                 const char *filename,
410                                                 QemuOpts *opts,
411                                                 Error **errp)
412 {
413     BlockdevCreateOptions *create_options;
414     BlockdevCreateOptionsRbd *rbd_opts;
415     BlockdevOptionsRbd *loc;
416     Error *local_err = NULL;
417     const char *keypairs, *password_secret;
418     QDict *options = NULL;
419     int ret = 0;
420 
421     create_options = g_new0(BlockdevCreateOptions, 1);
422     create_options->driver = BLOCKDEV_DRIVER_RBD;
423     rbd_opts = &create_options->u.rbd;
424 
425     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
426 
427     password_secret = qemu_opt_get(opts, "password-secret");
428 
429     /* Read out options */
430     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
431                               BDRV_SECTOR_SIZE);
432     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
433                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
434     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
435 
436     options = qdict_new();
437     qemu_rbd_parse_filename(filename, options, &local_err);
438     if (local_err) {
439         ret = -EINVAL;
440         error_propagate(errp, local_err);
441         goto exit;
442     }
443 
444     /*
445      * Caution: while qdict_get_try_str() is fine, getting non-string
446      * types would require more care.  When @options come from -blockdev
447      * or blockdev_add, its members are typed according to the QAPI
448      * schema, but when they come from -drive, they're all QString.
449      */
450     loc = rbd_opts->location;
451     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
452     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
453     loc->has_conf    = !!loc->conf;
454     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
455     loc->has_user    = !!loc->user;
456     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
457     loc->has_q_namespace = !!loc->q_namespace;
458     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
459     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
460 
461     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
462     if (ret < 0) {
463         goto exit;
464     }
465 
466 exit:
467     qobject_unref(options);
468     qapi_free_BlockdevCreateOptions(create_options);
469     return ret;
470 }
471 
472 /*
473  * This aio completion is being called from rbd_finish_bh() and runs in qemu
474  * BH context.
475  */
476 static void qemu_rbd_complete_aio(RADOSCB *rcb)
477 {
478     RBDAIOCB *acb = rcb->acb;
479     int64_t r;
480 
481     r = rcb->ret;
482 
483     if (acb->cmd != RBD_AIO_READ) {
484         if (r < 0) {
485             acb->ret = r;
486             acb->error = 1;
487         } else if (!acb->error) {
488             acb->ret = rcb->size;
489         }
490     } else {
491         if (r < 0) {
492             qemu_rbd_memset(rcb, 0);
493             acb->ret = r;
494             acb->error = 1;
495         } else if (r < rcb->size) {
496             qemu_rbd_memset(rcb, r);
497             if (!acb->error) {
498                 acb->ret = rcb->size;
499             }
500         } else if (!acb->error) {
501             acb->ret = r;
502         }
503     }
504 
505     g_free(rcb);
506 
507     if (!LIBRBD_USE_IOVEC) {
508         if (acb->cmd == RBD_AIO_READ) {
509             qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
510         }
511         qemu_vfree(acb->bounce);
512     }
513 
514     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
515 
516     qemu_aio_unref(acb);
517 }
518 
519 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
520 {
521     const char **vals;
522     const char *host, *port;
523     char *rados_str;
524     InetSocketAddressBaseList *p;
525     int i, cnt;
526 
527     if (!opts->has_server) {
528         return NULL;
529     }
530 
531     for (cnt = 0, p = opts->server; p; p = p->next) {
532         cnt++;
533     }
534 
535     vals = g_new(const char *, cnt + 1);
536 
537     for (i = 0, p = opts->server; p; p = p->next, i++) {
538         host = p->value->host;
539         port = p->value->port;
540 
541         if (strchr(host, ':')) {
542             vals[i] = g_strdup_printf("[%s]:%s", host, port);
543         } else {
544             vals[i] = g_strdup_printf("%s:%s", host, port);
545         }
546     }
547     vals[i] = NULL;
548 
549     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
550     g_strfreev((char **)vals);
551     return rados_str;
552 }
553 
554 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
555                             BlockdevOptionsRbd *opts, bool cache,
556                             const char *keypairs, const char *secretid,
557                             Error **errp)
558 {
559     char *mon_host = NULL;
560     Error *local_err = NULL;
561     int r;
562 
563     if (secretid) {
564         if (opts->key_secret) {
565             error_setg(errp,
566                        "Legacy 'password-secret' clashes with 'key-secret'");
567             return -EINVAL;
568         }
569         opts->key_secret = g_strdup(secretid);
570         opts->has_key_secret = true;
571     }
572 
573     mon_host = qemu_rbd_mon_host(opts, &local_err);
574     if (local_err) {
575         error_propagate(errp, local_err);
576         r = -EINVAL;
577         goto out;
578     }
579 
580     r = rados_create(cluster, opts->user);
581     if (r < 0) {
582         error_setg_errno(errp, -r, "error initializing");
583         goto out;
584     }
585 
586     /* try default location when conf=NULL, but ignore failure */
587     r = rados_conf_read_file(*cluster, opts->conf);
588     if (opts->has_conf && r < 0) {
589         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
590         goto failed_shutdown;
591     }
592 
593     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
594     if (r < 0) {
595         goto failed_shutdown;
596     }
597 
598     if (mon_host) {
599         r = rados_conf_set(*cluster, "mon_host", mon_host);
600         if (r < 0) {
601             goto failed_shutdown;
602         }
603     }
604 
605     r = qemu_rbd_set_auth(*cluster, opts, errp);
606     if (r < 0) {
607         goto failed_shutdown;
608     }
609 
610     /*
611      * Fallback to more conservative semantics if setting cache
612      * options fails. Ignore errors from setting rbd_cache because the
613      * only possible error is that the option does not exist, and
614      * librbd defaults to no caching. If write through caching cannot
615      * be set up, fall back to no caching.
616      */
617     if (cache) {
618         rados_conf_set(*cluster, "rbd_cache", "true");
619     } else {
620         rados_conf_set(*cluster, "rbd_cache", "false");
621     }
622 
623     r = rados_connect(*cluster);
624     if (r < 0) {
625         error_setg_errno(errp, -r, "error connecting");
626         goto failed_shutdown;
627     }
628 
629     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
630     if (r < 0) {
631         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
632         goto failed_shutdown;
633     }
634     /*
635      * Set the namespace after opening the io context on the pool,
636      * if nspace == NULL or if nspace == "", it is just as we did nothing
637      */
638     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
639 
640     r = 0;
641     goto out;
642 
643 failed_shutdown:
644     rados_shutdown(*cluster);
645 out:
646     g_free(mon_host);
647     return r;
648 }
649 
650 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
651                                     Error **errp)
652 {
653     Visitor *v;
654 
655     /* Convert the remaining options into a QAPI object */
656     v = qobject_input_visitor_new_flat_confused(options, errp);
657     if (!v) {
658         return -EINVAL;
659     }
660 
661     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
662     visit_free(v);
663     if (!opts) {
664         return -EINVAL;
665     }
666 
667     return 0;
668 }
669 
670 static int qemu_rbd_attempt_legacy_options(QDict *options,
671                                            BlockdevOptionsRbd **opts,
672                                            char **keypairs)
673 {
674     char *filename;
675     int r;
676 
677     filename = g_strdup(qdict_get_try_str(options, "filename"));
678     if (!filename) {
679         return -EINVAL;
680     }
681     qdict_del(options, "filename");
682 
683     qemu_rbd_parse_filename(filename, options, NULL);
684 
685     /* keypairs freed by caller */
686     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
687     if (*keypairs) {
688         qdict_del(options, "=keyvalue-pairs");
689     }
690 
691     r = qemu_rbd_convert_options(options, opts, NULL);
692 
693     g_free(filename);
694     return r;
695 }
696 
697 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
698                          Error **errp)
699 {
700     BDRVRBDState *s = bs->opaque;
701     BlockdevOptionsRbd *opts = NULL;
702     const QDictEntry *e;
703     Error *local_err = NULL;
704     char *keypairs, *secretid;
705     int r;
706 
707     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
708     if (keypairs) {
709         qdict_del(options, "=keyvalue-pairs");
710     }
711 
712     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
713     if (secretid) {
714         qdict_del(options, "password-secret");
715     }
716 
717     r = qemu_rbd_convert_options(options, &opts, &local_err);
718     if (local_err) {
719         /* If keypairs are present, that means some options are present in
720          * the modern option format.  Don't attempt to parse legacy option
721          * formats, as we won't support mixed usage. */
722         if (keypairs) {
723             error_propagate(errp, local_err);
724             goto out;
725         }
726 
727         /* If the initial attempt to convert and process the options failed,
728          * we may be attempting to open an image file that has the rbd options
729          * specified in the older format consisting of all key/value pairs
730          * encoded in the filename.  Go ahead and attempt to parse the
731          * filename, and see if we can pull out the required options. */
732         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
733         if (r < 0) {
734             /* Propagate the original error, not the legacy parsing fallback
735              * error, as the latter was just a best-effort attempt. */
736             error_propagate(errp, local_err);
737             goto out;
738         }
739         /* Take care whenever deciding to actually deprecate; once this ability
740          * is removed, we will not be able to open any images with legacy-styled
741          * backing image strings. */
742         warn_report("RBD options encoded in the filename as keyvalue pairs "
743                     "is deprecated");
744     }
745 
746     /* Remove the processed options from the QDict (the visitor processes
747      * _all_ options in the QDict) */
748     while ((e = qdict_first(options))) {
749         qdict_del(options, e->key);
750     }
751 
752     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
753                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
754     if (r < 0) {
755         goto out;
756     }
757 
758     s->snap = g_strdup(opts->snapshot);
759     s->image_name = g_strdup(opts->image);
760 
761     /* rbd_open is always r/w */
762     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
763     if (r < 0) {
764         error_setg_errno(errp, -r, "error reading header from %s",
765                          s->image_name);
766         goto failed_open;
767     }
768 
769     r = rbd_get_size(s->image, &s->image_size);
770     if (r < 0) {
771         error_setg_errno(errp, -r, "error getting image size from %s",
772                          s->image_name);
773         rbd_close(s->image);
774         goto failed_open;
775     }
776 
777     /* If we are using an rbd snapshot, we must be r/o, otherwise
778      * leave as-is */
779     if (s->snap != NULL) {
780         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
781         if (r < 0) {
782             rbd_close(s->image);
783             goto failed_open;
784         }
785     }
786 
787     /* When extending regular files, we get zeros from the OS */
788     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
789 
790     r = 0;
791     goto out;
792 
793 failed_open:
794     rados_ioctx_destroy(s->io_ctx);
795     g_free(s->snap);
796     g_free(s->image_name);
797     rados_shutdown(s->cluster);
798 out:
799     qapi_free_BlockdevOptionsRbd(opts);
800     g_free(keypairs);
801     g_free(secretid);
802     return r;
803 }
804 
805 
806 /* Since RBD is currently always opened R/W via the API,
807  * we just need to check if we are using a snapshot or not, in
808  * order to determine if we will allow it to be R/W */
809 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
810                                    BlockReopenQueue *queue, Error **errp)
811 {
812     BDRVRBDState *s = state->bs->opaque;
813     int ret = 0;
814 
815     if (s->snap && state->flags & BDRV_O_RDWR) {
816         error_setg(errp,
817                    "Cannot change node '%s' to r/w when using RBD snapshot",
818                    bdrv_get_device_or_node_name(state->bs));
819         ret = -EINVAL;
820     }
821 
822     return ret;
823 }
824 
825 static void qemu_rbd_close(BlockDriverState *bs)
826 {
827     BDRVRBDState *s = bs->opaque;
828 
829     rbd_close(s->image);
830     rados_ioctx_destroy(s->io_ctx);
831     g_free(s->snap);
832     g_free(s->image_name);
833     rados_shutdown(s->cluster);
834 }
835 
836 /* Resize the RBD image and update the 'image_size' with the current size */
837 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
838 {
839     BDRVRBDState *s = bs->opaque;
840     int r;
841 
842     r = rbd_resize(s->image, size);
843     if (r < 0) {
844         return r;
845     }
846 
847     s->image_size = size;
848 
849     return 0;
850 }
851 
852 static const AIOCBInfo rbd_aiocb_info = {
853     .aiocb_size = sizeof(RBDAIOCB),
854 };
855 
856 static void rbd_finish_bh(void *opaque)
857 {
858     RADOSCB *rcb = opaque;
859     qemu_rbd_complete_aio(rcb);
860 }
861 
862 /*
863  * This is the callback function for rbd_aio_read and _write
864  *
865  * Note: this function is being called from a non qemu thread so
866  * we need to be careful about what we do here. Generally we only
867  * schedule a BH, and do the rest of the io completion handling
868  * from rbd_finish_bh() which runs in a qemu context.
869  */
870 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
871 {
872     RBDAIOCB *acb = rcb->acb;
873 
874     rcb->ret = rbd_aio_get_return_value(c);
875     rbd_aio_release(c);
876 
877     replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
878                                      rbd_finish_bh, rcb);
879 }
880 
881 static int rbd_aio_discard_wrapper(rbd_image_t image,
882                                    uint64_t off,
883                                    uint64_t len,
884                                    rbd_completion_t comp)
885 {
886 #ifdef LIBRBD_SUPPORTS_DISCARD
887     return rbd_aio_discard(image, off, len, comp);
888 #else
889     return -ENOTSUP;
890 #endif
891 }
892 
893 static int rbd_aio_flush_wrapper(rbd_image_t image,
894                                  rbd_completion_t comp)
895 {
896 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
897     return rbd_aio_flush(image, comp);
898 #else
899     return -ENOTSUP;
900 #endif
901 }
902 
903 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
904                                  int64_t off,
905                                  QEMUIOVector *qiov,
906                                  int64_t size,
907                                  BlockCompletionFunc *cb,
908                                  void *opaque,
909                                  RBDAIOCmd cmd)
910 {
911     RBDAIOCB *acb;
912     RADOSCB *rcb = NULL;
913     rbd_completion_t c;
914     int r;
915 
916     BDRVRBDState *s = bs->opaque;
917 
918     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
919     acb->cmd = cmd;
920     acb->qiov = qiov;
921     assert(!qiov || qiov->size == size);
922 
923     rcb = g_new(RADOSCB, 1);
924 
925     if (!LIBRBD_USE_IOVEC) {
926         if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
927             acb->bounce = NULL;
928         } else {
929             acb->bounce = qemu_try_blockalign(bs, qiov->size);
930             if (acb->bounce == NULL) {
931                 goto failed;
932             }
933         }
934         if (cmd == RBD_AIO_WRITE) {
935             qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
936         }
937         rcb->buf = acb->bounce;
938     }
939 
940     acb->ret = 0;
941     acb->error = 0;
942     acb->s = s;
943 
944     rcb->acb = acb;
945     rcb->s = acb->s;
946     rcb->size = size;
947     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
948     if (r < 0) {
949         goto failed;
950     }
951 
952     switch (cmd) {
953     case RBD_AIO_WRITE: {
954         /*
955          * RBD APIs don't allow us to write more than actual size, so in order
956          * to support growing images, we resize the image before write
957          * operations that exceed the current size.
958          */
959         if (off + size > s->image_size) {
960             r = qemu_rbd_resize(bs, off + size);
961             if (r < 0) {
962                 goto failed_completion;
963             }
964         }
965 #ifdef LIBRBD_SUPPORTS_IOVEC
966             r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
967 #else
968             r = rbd_aio_write(s->image, off, size, rcb->buf, c);
969 #endif
970         break;
971     }
972     case RBD_AIO_READ:
973 #ifdef LIBRBD_SUPPORTS_IOVEC
974             r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
975 #else
976             r = rbd_aio_read(s->image, off, size, rcb->buf, c);
977 #endif
978         break;
979     case RBD_AIO_DISCARD:
980         r = rbd_aio_discard_wrapper(s->image, off, size, c);
981         break;
982     case RBD_AIO_FLUSH:
983         r = rbd_aio_flush_wrapper(s->image, c);
984         break;
985     default:
986         r = -EINVAL;
987     }
988 
989     if (r < 0) {
990         goto failed_completion;
991     }
992     return &acb->common;
993 
994 failed_completion:
995     rbd_aio_release(c);
996 failed:
997     g_free(rcb);
998     if (!LIBRBD_USE_IOVEC) {
999         qemu_vfree(acb->bounce);
1000     }
1001 
1002     qemu_aio_unref(acb);
1003     return NULL;
1004 }
1005 
1006 static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
1007                                        uint64_t offset, uint64_t bytes,
1008                                        QEMUIOVector *qiov, int flags,
1009                                        BlockCompletionFunc *cb,
1010                                        void *opaque)
1011 {
1012     return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1013                          RBD_AIO_READ);
1014 }
1015 
1016 static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
1017                                         uint64_t offset, uint64_t bytes,
1018                                         QEMUIOVector *qiov, int flags,
1019                                         BlockCompletionFunc *cb,
1020                                         void *opaque)
1021 {
1022     return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
1023                          RBD_AIO_WRITE);
1024 }
1025 
1026 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1027 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
1028                                       BlockCompletionFunc *cb,
1029                                       void *opaque)
1030 {
1031     return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
1032 }
1033 
1034 #else
1035 
1036 static int qemu_rbd_co_flush(BlockDriverState *bs)
1037 {
1038 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
1039     /* rbd_flush added in 0.1.1 */
1040     BDRVRBDState *s = bs->opaque;
1041     return rbd_flush(s->image);
1042 #else
1043     return 0;
1044 #endif
1045 }
1046 #endif
1047 
1048 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1049 {
1050     BDRVRBDState *s = bs->opaque;
1051     rbd_image_info_t info;
1052     int r;
1053 
1054     r = rbd_stat(s->image, &info, sizeof(info));
1055     if (r < 0) {
1056         return r;
1057     }
1058 
1059     bdi->cluster_size = info.obj_size;
1060     return 0;
1061 }
1062 
1063 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1064 {
1065     BDRVRBDState *s = bs->opaque;
1066     rbd_image_info_t info;
1067     int r;
1068 
1069     r = rbd_stat(s->image, &info, sizeof(info));
1070     if (r < 0) {
1071         return r;
1072     }
1073 
1074     return info.size;
1075 }
1076 
1077 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1078                                              int64_t offset,
1079                                              bool exact,
1080                                              PreallocMode prealloc,
1081                                              BdrvRequestFlags flags,
1082                                              Error **errp)
1083 {
1084     int r;
1085 
1086     if (prealloc != PREALLOC_MODE_OFF) {
1087         error_setg(errp, "Unsupported preallocation mode '%s'",
1088                    PreallocMode_str(prealloc));
1089         return -ENOTSUP;
1090     }
1091 
1092     r = qemu_rbd_resize(bs, offset);
1093     if (r < 0) {
1094         error_setg_errno(errp, -r, "Failed to resize file");
1095         return r;
1096     }
1097 
1098     return 0;
1099 }
1100 
1101 static int qemu_rbd_snap_create(BlockDriverState *bs,
1102                                 QEMUSnapshotInfo *sn_info)
1103 {
1104     BDRVRBDState *s = bs->opaque;
1105     int r;
1106 
1107     if (sn_info->name[0] == '\0') {
1108         return -EINVAL; /* we need a name for rbd snapshots */
1109     }
1110 
1111     /*
1112      * rbd snapshots are using the name as the user controlled unique identifier
1113      * we can't use the rbd snapid for that purpose, as it can't be set
1114      */
1115     if (sn_info->id_str[0] != '\0' &&
1116         strcmp(sn_info->id_str, sn_info->name) != 0) {
1117         return -EINVAL;
1118     }
1119 
1120     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1121         return -ERANGE;
1122     }
1123 
1124     r = rbd_snap_create(s->image, sn_info->name);
1125     if (r < 0) {
1126         error_report("failed to create snap: %s", strerror(-r));
1127         return r;
1128     }
1129 
1130     return 0;
1131 }
1132 
1133 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1134                                 const char *snapshot_id,
1135                                 const char *snapshot_name,
1136                                 Error **errp)
1137 {
1138     BDRVRBDState *s = bs->opaque;
1139     int r;
1140 
1141     if (!snapshot_name) {
1142         error_setg(errp, "rbd need a valid snapshot name");
1143         return -EINVAL;
1144     }
1145 
1146     /* If snapshot_id is specified, it must be equal to name, see
1147        qemu_rbd_snap_list() */
1148     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1149         error_setg(errp,
1150                    "rbd do not support snapshot id, it should be NULL or "
1151                    "equal to snapshot name");
1152         return -EINVAL;
1153     }
1154 
1155     r = rbd_snap_remove(s->image, snapshot_name);
1156     if (r < 0) {
1157         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1158     }
1159     return r;
1160 }
1161 
1162 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1163                                   const char *snapshot_name)
1164 {
1165     BDRVRBDState *s = bs->opaque;
1166 
1167     return rbd_snap_rollback(s->image, snapshot_name);
1168 }
1169 
1170 static int qemu_rbd_snap_list(BlockDriverState *bs,
1171                               QEMUSnapshotInfo **psn_tab)
1172 {
1173     BDRVRBDState *s = bs->opaque;
1174     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1175     int i, snap_count;
1176     rbd_snap_info_t *snaps;
1177     int max_snaps = RBD_MAX_SNAPS;
1178 
1179     do {
1180         snaps = g_new(rbd_snap_info_t, max_snaps);
1181         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1182         if (snap_count <= 0) {
1183             g_free(snaps);
1184         }
1185     } while (snap_count == -ERANGE);
1186 
1187     if (snap_count <= 0) {
1188         goto done;
1189     }
1190 
1191     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1192 
1193     for (i = 0; i < snap_count; i++) {
1194         const char *snap_name = snaps[i].name;
1195 
1196         sn_info = sn_tab + i;
1197         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1198         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1199 
1200         sn_info->vm_state_size = snaps[i].size;
1201         sn_info->date_sec = 0;
1202         sn_info->date_nsec = 0;
1203         sn_info->vm_clock_nsec = 0;
1204     }
1205     rbd_snap_list_end(snaps);
1206     g_free(snaps);
1207 
1208  done:
1209     *psn_tab = sn_tab;
1210     return snap_count;
1211 }
1212 
1213 #ifdef LIBRBD_SUPPORTS_DISCARD
1214 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1215                                          int64_t offset,
1216                                          int bytes,
1217                                          BlockCompletionFunc *cb,
1218                                          void *opaque)
1219 {
1220     return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
1221                          RBD_AIO_DISCARD);
1222 }
1223 #endif
1224 
1225 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1226 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1227                                                       Error **errp)
1228 {
1229     BDRVRBDState *s = bs->opaque;
1230     int r = rbd_invalidate_cache(s->image);
1231     if (r < 0) {
1232         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1233     }
1234 }
1235 #endif
1236 
1237 static QemuOptsList qemu_rbd_create_opts = {
1238     .name = "rbd-create-opts",
1239     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1240     .desc = {
1241         {
1242             .name = BLOCK_OPT_SIZE,
1243             .type = QEMU_OPT_SIZE,
1244             .help = "Virtual disk size"
1245         },
1246         {
1247             .name = BLOCK_OPT_CLUSTER_SIZE,
1248             .type = QEMU_OPT_SIZE,
1249             .help = "RBD object size"
1250         },
1251         {
1252             .name = "password-secret",
1253             .type = QEMU_OPT_STRING,
1254             .help = "ID of secret providing the password",
1255         },
1256         { /* end of list */ }
1257     }
1258 };
1259 
1260 static const char *const qemu_rbd_strong_runtime_opts[] = {
1261     "pool",
1262     "namespace",
1263     "image",
1264     "conf",
1265     "snapshot",
1266     "user",
1267     "server.",
1268     "password-secret",
1269 
1270     NULL
1271 };
1272 
1273 static BlockDriver bdrv_rbd = {
1274     .format_name            = "rbd",
1275     .instance_size          = sizeof(BDRVRBDState),
1276     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1277     .bdrv_refresh_limits    = qemu_rbd_refresh_limits,
1278     .bdrv_file_open         = qemu_rbd_open,
1279     .bdrv_close             = qemu_rbd_close,
1280     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1281     .bdrv_co_create         = qemu_rbd_co_create,
1282     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1283     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1284     .bdrv_get_info          = qemu_rbd_getinfo,
1285     .create_opts            = &qemu_rbd_create_opts,
1286     .bdrv_getlength         = qemu_rbd_getlength,
1287     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1288     .protocol_name          = "rbd",
1289 
1290     .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
1291     .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
1292 
1293 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1294     .bdrv_aio_flush         = qemu_rbd_aio_flush,
1295 #else
1296     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1297 #endif
1298 
1299 #ifdef LIBRBD_SUPPORTS_DISCARD
1300     .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1301 #endif
1302 
1303     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1304     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1305     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1306     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1307 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1308     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1309 #endif
1310 
1311     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1312 };
1313 
1314 static void bdrv_rbd_init(void)
1315 {
1316     bdrv_register(&bdrv_rbd);
1317 }
1318 
1319 block_init(bdrv_rbd_init);
1320