xref: /qemu/block/rbd.c (revision 654d6b04)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block_int.h"
22 #include "block/qdict.h"
23 #include "crypto/secret.h"
24 #include "qemu/cutils.h"
25 #include "sysemu/replay.h"
26 #include "qapi/qmp/qstring.h"
27 #include "qapi/qmp/qdict.h"
28 #include "qapi/qmp/qjson.h"
29 #include "qapi/qmp/qlist.h"
30 #include "qapi/qobject-input-visitor.h"
31 #include "qapi/qapi-visit-block-core.h"
32 
33 /*
34  * When specifying the image filename use:
35  *
36  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
37  *
38  * poolname must be the name of an existing rados pool.
39  *
40  * devicename is the name of the rbd image.
41  *
42  * Each option given is used to configure rados, and may be any valid
43  * Ceph option, "id", or "conf".
44  *
45  * The "id" option indicates what user we should authenticate as to
46  * the Ceph cluster.  If it is excluded we will use the Ceph default
47  * (normally 'admin').
48  *
49  * The "conf" option specifies a Ceph configuration file to read.  If
50  * it is not specified, we will read from the default Ceph locations
51  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
52  * file, specify conf=/dev/null.
53  *
54  * Configuration values containing :, @, or = can be escaped with a
55  * leading "\".
56  */
57 
58 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
59 
60 #define RBD_MAX_SNAPS 100
61 
62 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
63 
64 static const char rbd_luks_header_verification[
65         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
66     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
67 };
68 
69 static const char rbd_luks2_header_verification[
70         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
71     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
72 };
73 
74 typedef enum {
75     RBD_AIO_READ,
76     RBD_AIO_WRITE,
77     RBD_AIO_DISCARD,
78     RBD_AIO_FLUSH,
79     RBD_AIO_WRITE_ZEROES
80 } RBDAIOCmd;
81 
82 typedef struct BDRVRBDState {
83     rados_t cluster;
84     rados_ioctx_t io_ctx;
85     rbd_image_t image;
86     char *image_name;
87     char *snap;
88     char *namespace;
89     uint64_t image_size;
90     uint64_t object_size;
91 } BDRVRBDState;
92 
93 typedef struct RBDTask {
94     BlockDriverState *bs;
95     Coroutine *co;
96     bool complete;
97     int64_t ret;
98 } RBDTask;
99 
100 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
101                             BlockdevOptionsRbd *opts, bool cache,
102                             const char *keypairs, const char *secretid,
103                             Error **errp);
104 
105 static char *qemu_rbd_strchr(char *src, char delim)
106 {
107     char *p;
108 
109     for (p = src; *p; ++p) {
110         if (*p == delim) {
111             return p;
112         }
113         if (*p == '\\' && p[1] != '\0') {
114             ++p;
115         }
116     }
117 
118     return NULL;
119 }
120 
121 
122 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
123 {
124     char *end;
125 
126     *p = NULL;
127 
128     end = qemu_rbd_strchr(src, delim);
129     if (end) {
130         *p = end + 1;
131         *end = '\0';
132     }
133     return src;
134 }
135 
136 static void qemu_rbd_unescape(char *src)
137 {
138     char *p;
139 
140     for (p = src; *src; ++src, ++p) {
141         if (*src == '\\' && src[1] != '\0') {
142             src++;
143         }
144         *p = *src;
145     }
146     *p = '\0';
147 }
148 
149 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
150                                     Error **errp)
151 {
152     const char *start;
153     char *p, *buf;
154     QList *keypairs = NULL;
155     char *found_str, *image_name;
156 
157     if (!strstart(filename, "rbd:", &start)) {
158         error_setg(errp, "File name must start with 'rbd:'");
159         return;
160     }
161 
162     buf = g_strdup(start);
163     p = buf;
164 
165     found_str = qemu_rbd_next_tok(p, '/', &p);
166     if (!p) {
167         error_setg(errp, "Pool name is required");
168         goto done;
169     }
170     qemu_rbd_unescape(found_str);
171     qdict_put_str(options, "pool", found_str);
172 
173     if (qemu_rbd_strchr(p, '@')) {
174         image_name = qemu_rbd_next_tok(p, '@', &p);
175 
176         found_str = qemu_rbd_next_tok(p, ':', &p);
177         qemu_rbd_unescape(found_str);
178         qdict_put_str(options, "snapshot", found_str);
179     } else {
180         image_name = qemu_rbd_next_tok(p, ':', &p);
181     }
182     /* Check for namespace in the image_name */
183     if (qemu_rbd_strchr(image_name, '/')) {
184         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
185         qemu_rbd_unescape(found_str);
186         qdict_put_str(options, "namespace", found_str);
187     } else {
188         qdict_put_str(options, "namespace", "");
189     }
190     qemu_rbd_unescape(image_name);
191     qdict_put_str(options, "image", image_name);
192     if (!p) {
193         goto done;
194     }
195 
196     /* The following are essentially all key/value pairs, and we treat
197      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
198     while (p) {
199         char *name, *value;
200         name = qemu_rbd_next_tok(p, '=', &p);
201         if (!p) {
202             error_setg(errp, "conf option %s has no value", name);
203             break;
204         }
205 
206         qemu_rbd_unescape(name);
207 
208         value = qemu_rbd_next_tok(p, ':', &p);
209         qemu_rbd_unescape(value);
210 
211         if (!strcmp(name, "conf")) {
212             qdict_put_str(options, "conf", value);
213         } else if (!strcmp(name, "id")) {
214             qdict_put_str(options, "user", value);
215         } else {
216             /*
217              * We pass these internally to qemu_rbd_set_keypairs(), so
218              * we can get away with the simpler list of [ "key1",
219              * "value1", "key2", "value2" ] rather than a raw dict
220              * { "key1": "value1", "key2": "value2" } where we can't
221              * guarantee order, or even a more correct but complex
222              * [ { "key1": "value1" }, { "key2": "value2" } ]
223              */
224             if (!keypairs) {
225                 keypairs = qlist_new();
226             }
227             qlist_append_str(keypairs, name);
228             qlist_append_str(keypairs, value);
229         }
230     }
231 
232     if (keypairs) {
233         qdict_put(options, "=keyvalue-pairs",
234                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
235     }
236 
237 done:
238     g_free(buf);
239     qobject_unref(keypairs);
240     return;
241 }
242 
243 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
244                              Error **errp)
245 {
246     char *key, *acr;
247     int r;
248     GString *accu;
249     RbdAuthModeList *auth;
250 
251     if (opts->key_secret) {
252         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
253         if (!key) {
254             return -EIO;
255         }
256         r = rados_conf_set(cluster, "key", key);
257         g_free(key);
258         if (r < 0) {
259             error_setg_errno(errp, -r, "Could not set 'key'");
260             return r;
261         }
262     }
263 
264     if (opts->has_auth_client_required) {
265         accu = g_string_new("");
266         for (auth = opts->auth_client_required; auth; auth = auth->next) {
267             if (accu->str[0]) {
268                 g_string_append_c(accu, ';');
269             }
270             g_string_append(accu, RbdAuthMode_str(auth->value));
271         }
272         acr = g_string_free(accu, FALSE);
273         r = rados_conf_set(cluster, "auth_client_required", acr);
274         g_free(acr);
275         if (r < 0) {
276             error_setg_errno(errp, -r,
277                              "Could not set 'auth_client_required'");
278             return r;
279         }
280     }
281 
282     return 0;
283 }
284 
285 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
286                                  Error **errp)
287 {
288     QList *keypairs;
289     QString *name;
290     QString *value;
291     const char *key;
292     size_t remaining;
293     int ret = 0;
294 
295     if (!keypairs_json) {
296         return ret;
297     }
298     keypairs = qobject_to(QList,
299                           qobject_from_json(keypairs_json, &error_abort));
300     remaining = qlist_size(keypairs) / 2;
301     assert(remaining);
302 
303     while (remaining--) {
304         name = qobject_to(QString, qlist_pop(keypairs));
305         value = qobject_to(QString, qlist_pop(keypairs));
306         assert(name && value);
307         key = qstring_get_str(name);
308 
309         ret = rados_conf_set(cluster, key, qstring_get_str(value));
310         qobject_unref(value);
311         if (ret < 0) {
312             error_setg_errno(errp, -ret, "invalid conf option %s", key);
313             qobject_unref(name);
314             ret = -EINVAL;
315             break;
316         }
317         qobject_unref(name);
318     }
319 
320     qobject_unref(keypairs);
321     return ret;
322 }
323 
324 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
325 static int qemu_rbd_convert_luks_options(
326         RbdEncryptionOptionsLUKSBase *luks_opts,
327         char **passphrase,
328         size_t *passphrase_len,
329         Error **errp)
330 {
331     return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
332                                  passphrase_len, errp);
333 }
334 
335 static int qemu_rbd_convert_luks_create_options(
336         RbdEncryptionCreateOptionsLUKSBase *luks_opts,
337         rbd_encryption_algorithm_t *alg,
338         char **passphrase,
339         size_t *passphrase_len,
340         Error **errp)
341 {
342     int r = 0;
343 
344     r = qemu_rbd_convert_luks_options(
345             qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
346             passphrase, passphrase_len, errp);
347     if (r < 0) {
348         return r;
349     }
350 
351     if (luks_opts->has_cipher_alg) {
352         switch (luks_opts->cipher_alg) {
353             case QCRYPTO_CIPHER_ALG_AES_128: {
354                 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
355                 break;
356             }
357             case QCRYPTO_CIPHER_ALG_AES_256: {
358                 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
359                 break;
360             }
361             default: {
362                 r = -ENOTSUP;
363                 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
364                                  luks_opts->cipher_alg);
365                 return r;
366             }
367         }
368     } else {
369         /* default alg */
370         *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
371     }
372 
373     return 0;
374 }
375 
376 static int qemu_rbd_encryption_format(rbd_image_t image,
377                                       RbdEncryptionCreateOptions *encrypt,
378                                       Error **errp)
379 {
380     int r = 0;
381     g_autofree char *passphrase = NULL;
382     size_t passphrase_len;
383     rbd_encryption_format_t format;
384     rbd_encryption_options_t opts;
385     rbd_encryption_luks1_format_options_t luks_opts;
386     rbd_encryption_luks2_format_options_t luks2_opts;
387     size_t opts_size;
388     uint64_t raw_size, effective_size;
389 
390     r = rbd_get_size(image, &raw_size);
391     if (r < 0) {
392         error_setg_errno(errp, -r, "cannot get raw image size");
393         return r;
394     }
395 
396     switch (encrypt->format) {
397         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
398             memset(&luks_opts, 0, sizeof(luks_opts));
399             format = RBD_ENCRYPTION_FORMAT_LUKS1;
400             opts = &luks_opts;
401             opts_size = sizeof(luks_opts);
402             r = qemu_rbd_convert_luks_create_options(
403                     qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
404                     &luks_opts.alg, &passphrase, &passphrase_len, errp);
405             if (r < 0) {
406                 return r;
407             }
408             luks_opts.passphrase = passphrase;
409             luks_opts.passphrase_size = passphrase_len;
410             break;
411         }
412         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
413             memset(&luks2_opts, 0, sizeof(luks2_opts));
414             format = RBD_ENCRYPTION_FORMAT_LUKS2;
415             opts = &luks2_opts;
416             opts_size = sizeof(luks2_opts);
417             r = qemu_rbd_convert_luks_create_options(
418                     qapi_RbdEncryptionCreateOptionsLUKS2_base(
419                             &encrypt->u.luks2),
420                     &luks2_opts.alg, &passphrase, &passphrase_len, errp);
421             if (r < 0) {
422                 return r;
423             }
424             luks2_opts.passphrase = passphrase;
425             luks2_opts.passphrase_size = passphrase_len;
426             break;
427         }
428         default: {
429             r = -ENOTSUP;
430             error_setg_errno(
431                     errp, -r, "unknown image encryption format: %u",
432                     encrypt->format);
433             return r;
434         }
435     }
436 
437     r = rbd_encryption_format(image, format, opts, opts_size);
438     if (r < 0) {
439         error_setg_errno(errp, -r, "encryption format fail");
440         return r;
441     }
442 
443     r = rbd_get_size(image, &effective_size);
444     if (r < 0) {
445         error_setg_errno(errp, -r, "cannot get effective image size");
446         return r;
447     }
448 
449     r = rbd_resize(image, raw_size + (raw_size - effective_size));
450     if (r < 0) {
451         error_setg_errno(errp, -r, "cannot resize image after format");
452         return r;
453     }
454 
455     return 0;
456 }
457 
458 static int qemu_rbd_encryption_load(rbd_image_t image,
459                                     RbdEncryptionOptions *encrypt,
460                                     Error **errp)
461 {
462     int r = 0;
463     g_autofree char *passphrase = NULL;
464     size_t passphrase_len;
465     rbd_encryption_luks1_format_options_t luks_opts;
466     rbd_encryption_luks2_format_options_t luks2_opts;
467     rbd_encryption_format_t format;
468     rbd_encryption_options_t opts;
469     size_t opts_size;
470 
471     switch (encrypt->format) {
472         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
473             memset(&luks_opts, 0, sizeof(luks_opts));
474             format = RBD_ENCRYPTION_FORMAT_LUKS1;
475             opts = &luks_opts;
476             opts_size = sizeof(luks_opts);
477             r = qemu_rbd_convert_luks_options(
478                     qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
479                     &passphrase, &passphrase_len, errp);
480             if (r < 0) {
481                 return r;
482             }
483             luks_opts.passphrase = passphrase;
484             luks_opts.passphrase_size = passphrase_len;
485             break;
486         }
487         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
488             memset(&luks2_opts, 0, sizeof(luks2_opts));
489             format = RBD_ENCRYPTION_FORMAT_LUKS2;
490             opts = &luks2_opts;
491             opts_size = sizeof(luks2_opts);
492             r = qemu_rbd_convert_luks_options(
493                     qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
494                     &passphrase, &passphrase_len, errp);
495             if (r < 0) {
496                 return r;
497             }
498             luks2_opts.passphrase = passphrase;
499             luks2_opts.passphrase_size = passphrase_len;
500             break;
501         }
502         default: {
503             r = -ENOTSUP;
504             error_setg_errno(
505                     errp, -r, "unknown image encryption format: %u",
506                     encrypt->format);
507             return r;
508         }
509     }
510 
511     r = rbd_encryption_load(image, format, opts, opts_size);
512     if (r < 0) {
513         error_setg_errno(errp, -r, "encryption load fail");
514         return r;
515     }
516 
517     return 0;
518 }
519 #endif
520 
521 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
522 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
523                               const char *keypairs, const char *password_secret,
524                               Error **errp)
525 {
526     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
527     rados_t cluster;
528     rados_ioctx_t io_ctx;
529     int obj_order = 0;
530     int ret;
531 
532     assert(options->driver == BLOCKDEV_DRIVER_RBD);
533     if (opts->location->has_snapshot) {
534         error_setg(errp, "Can't use snapshot name for image creation");
535         return -EINVAL;
536     }
537 
538 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
539     if (opts->has_encrypt) {
540         error_setg(errp, "RBD library does not support image encryption");
541         return -ENOTSUP;
542     }
543 #endif
544 
545     if (opts->has_cluster_size) {
546         int64_t objsize = opts->cluster_size;
547         if ((objsize - 1) & objsize) {    /* not a power of 2? */
548             error_setg(errp, "obj size needs to be power of 2");
549             return -EINVAL;
550         }
551         if (objsize < 4096) {
552             error_setg(errp, "obj size too small");
553             return -EINVAL;
554         }
555         obj_order = ctz32(objsize);
556     }
557 
558     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
559                            password_secret, errp);
560     if (ret < 0) {
561         return ret;
562     }
563 
564     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
565     if (ret < 0) {
566         error_setg_errno(errp, -ret, "error rbd create");
567         goto out;
568     }
569 
570 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
571     if (opts->has_encrypt) {
572         rbd_image_t image;
573 
574         ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
575         if (ret < 0) {
576             error_setg_errno(errp, -ret,
577                              "error opening image '%s' for encryption format",
578                              opts->location->image);
579             goto out;
580         }
581 
582         ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
583         rbd_close(image);
584         if (ret < 0) {
585             /* encryption format fail, try removing the image */
586             rbd_remove(io_ctx, opts->location->image);
587             goto out;
588         }
589     }
590 #endif
591 
592     ret = 0;
593 out:
594     rados_ioctx_destroy(io_ctx);
595     rados_shutdown(cluster);
596     return ret;
597 }
598 
599 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
600 {
601     return qemu_rbd_do_create(options, NULL, NULL, errp);
602 }
603 
604 static int qemu_rbd_extract_encryption_create_options(
605         QemuOpts *opts,
606         RbdEncryptionCreateOptions **spec,
607         Error **errp)
608 {
609     QDict *opts_qdict;
610     QDict *encrypt_qdict;
611     Visitor *v;
612     int ret = 0;
613 
614     opts_qdict = qemu_opts_to_qdict(opts, NULL);
615     qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
616     qobject_unref(opts_qdict);
617     if (!qdict_size(encrypt_qdict)) {
618         *spec = NULL;
619         goto exit;
620     }
621 
622     /* Convert options into a QAPI object */
623     v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
624     if (!v) {
625         ret = -EINVAL;
626         goto exit;
627     }
628 
629     visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
630     visit_free(v);
631     if (!*spec) {
632         ret = -EINVAL;
633         goto exit;
634     }
635 
636 exit:
637     qobject_unref(encrypt_qdict);
638     return ret;
639 }
640 
641 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
642                                                 const char *filename,
643                                                 QemuOpts *opts,
644                                                 Error **errp)
645 {
646     BlockdevCreateOptions *create_options;
647     BlockdevCreateOptionsRbd *rbd_opts;
648     BlockdevOptionsRbd *loc;
649     RbdEncryptionCreateOptions *encrypt = NULL;
650     Error *local_err = NULL;
651     const char *keypairs, *password_secret;
652     QDict *options = NULL;
653     int ret = 0;
654 
655     create_options = g_new0(BlockdevCreateOptions, 1);
656     create_options->driver = BLOCKDEV_DRIVER_RBD;
657     rbd_opts = &create_options->u.rbd;
658 
659     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
660 
661     password_secret = qemu_opt_get(opts, "password-secret");
662 
663     /* Read out options */
664     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
665                               BDRV_SECTOR_SIZE);
666     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
667                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
668     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
669 
670     options = qdict_new();
671     qemu_rbd_parse_filename(filename, options, &local_err);
672     if (local_err) {
673         ret = -EINVAL;
674         error_propagate(errp, local_err);
675         goto exit;
676     }
677 
678     ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
679     if (ret < 0) {
680         goto exit;
681     }
682     rbd_opts->encrypt     = encrypt;
683     rbd_opts->has_encrypt = !!encrypt;
684 
685     /*
686      * Caution: while qdict_get_try_str() is fine, getting non-string
687      * types would require more care.  When @options come from -blockdev
688      * or blockdev_add, its members are typed according to the QAPI
689      * schema, but when they come from -drive, they're all QString.
690      */
691     loc = rbd_opts->location;
692     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
693     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
694     loc->has_conf    = !!loc->conf;
695     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
696     loc->has_user    = !!loc->user;
697     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
698     loc->has_q_namespace = !!loc->q_namespace;
699     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
700     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
701 
702     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
703     if (ret < 0) {
704         goto exit;
705     }
706 
707 exit:
708     qobject_unref(options);
709     qapi_free_BlockdevCreateOptions(create_options);
710     return ret;
711 }
712 
713 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
714 {
715     const char **vals;
716     const char *host, *port;
717     char *rados_str;
718     InetSocketAddressBaseList *p;
719     int i, cnt;
720 
721     if (!opts->has_server) {
722         return NULL;
723     }
724 
725     for (cnt = 0, p = opts->server; p; p = p->next) {
726         cnt++;
727     }
728 
729     vals = g_new(const char *, cnt + 1);
730 
731     for (i = 0, p = opts->server; p; p = p->next, i++) {
732         host = p->value->host;
733         port = p->value->port;
734 
735         if (strchr(host, ':')) {
736             vals[i] = g_strdup_printf("[%s]:%s", host, port);
737         } else {
738             vals[i] = g_strdup_printf("%s:%s", host, port);
739         }
740     }
741     vals[i] = NULL;
742 
743     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
744     g_strfreev((char **)vals);
745     return rados_str;
746 }
747 
748 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
749                             BlockdevOptionsRbd *opts, bool cache,
750                             const char *keypairs, const char *secretid,
751                             Error **errp)
752 {
753     char *mon_host = NULL;
754     Error *local_err = NULL;
755     int r;
756 
757     if (secretid) {
758         if (opts->key_secret) {
759             error_setg(errp,
760                        "Legacy 'password-secret' clashes with 'key-secret'");
761             return -EINVAL;
762         }
763         opts->key_secret = g_strdup(secretid);
764         opts->has_key_secret = true;
765     }
766 
767     mon_host = qemu_rbd_mon_host(opts, &local_err);
768     if (local_err) {
769         error_propagate(errp, local_err);
770         r = -EINVAL;
771         goto out;
772     }
773 
774     r = rados_create(cluster, opts->user);
775     if (r < 0) {
776         error_setg_errno(errp, -r, "error initializing");
777         goto out;
778     }
779 
780     /* try default location when conf=NULL, but ignore failure */
781     r = rados_conf_read_file(*cluster, opts->conf);
782     if (opts->has_conf && r < 0) {
783         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
784         goto failed_shutdown;
785     }
786 
787     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
788     if (r < 0) {
789         goto failed_shutdown;
790     }
791 
792     if (mon_host) {
793         r = rados_conf_set(*cluster, "mon_host", mon_host);
794         if (r < 0) {
795             goto failed_shutdown;
796         }
797     }
798 
799     r = qemu_rbd_set_auth(*cluster, opts, errp);
800     if (r < 0) {
801         goto failed_shutdown;
802     }
803 
804     /*
805      * Fallback to more conservative semantics if setting cache
806      * options fails. Ignore errors from setting rbd_cache because the
807      * only possible error is that the option does not exist, and
808      * librbd defaults to no caching. If write through caching cannot
809      * be set up, fall back to no caching.
810      */
811     if (cache) {
812         rados_conf_set(*cluster, "rbd_cache", "true");
813     } else {
814         rados_conf_set(*cluster, "rbd_cache", "false");
815     }
816 
817     r = rados_connect(*cluster);
818     if (r < 0) {
819         error_setg_errno(errp, -r, "error connecting");
820         goto failed_shutdown;
821     }
822 
823     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
824     if (r < 0) {
825         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
826         goto failed_shutdown;
827     }
828     /*
829      * Set the namespace after opening the io context on the pool,
830      * if nspace == NULL or if nspace == "", it is just as we did nothing
831      */
832     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
833 
834     r = 0;
835     goto out;
836 
837 failed_shutdown:
838     rados_shutdown(*cluster);
839 out:
840     g_free(mon_host);
841     return r;
842 }
843 
844 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
845                                     Error **errp)
846 {
847     Visitor *v;
848 
849     /* Convert the remaining options into a QAPI object */
850     v = qobject_input_visitor_new_flat_confused(options, errp);
851     if (!v) {
852         return -EINVAL;
853     }
854 
855     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
856     visit_free(v);
857     if (!opts) {
858         return -EINVAL;
859     }
860 
861     return 0;
862 }
863 
864 static int qemu_rbd_attempt_legacy_options(QDict *options,
865                                            BlockdevOptionsRbd **opts,
866                                            char **keypairs)
867 {
868     char *filename;
869     int r;
870 
871     filename = g_strdup(qdict_get_try_str(options, "filename"));
872     if (!filename) {
873         return -EINVAL;
874     }
875     qdict_del(options, "filename");
876 
877     qemu_rbd_parse_filename(filename, options, NULL);
878 
879     /* keypairs freed by caller */
880     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
881     if (*keypairs) {
882         qdict_del(options, "=keyvalue-pairs");
883     }
884 
885     r = qemu_rbd_convert_options(options, opts, NULL);
886 
887     g_free(filename);
888     return r;
889 }
890 
891 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
892                          Error **errp)
893 {
894     BDRVRBDState *s = bs->opaque;
895     BlockdevOptionsRbd *opts = NULL;
896     const QDictEntry *e;
897     Error *local_err = NULL;
898     char *keypairs, *secretid;
899     rbd_image_info_t info;
900     int r;
901 
902     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
903     if (keypairs) {
904         qdict_del(options, "=keyvalue-pairs");
905     }
906 
907     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
908     if (secretid) {
909         qdict_del(options, "password-secret");
910     }
911 
912     r = qemu_rbd_convert_options(options, &opts, &local_err);
913     if (local_err) {
914         /* If keypairs are present, that means some options are present in
915          * the modern option format.  Don't attempt to parse legacy option
916          * formats, as we won't support mixed usage. */
917         if (keypairs) {
918             error_propagate(errp, local_err);
919             goto out;
920         }
921 
922         /* If the initial attempt to convert and process the options failed,
923          * we may be attempting to open an image file that has the rbd options
924          * specified in the older format consisting of all key/value pairs
925          * encoded in the filename.  Go ahead and attempt to parse the
926          * filename, and see if we can pull out the required options. */
927         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
928         if (r < 0) {
929             /* Propagate the original error, not the legacy parsing fallback
930              * error, as the latter was just a best-effort attempt. */
931             error_propagate(errp, local_err);
932             goto out;
933         }
934         /* Take care whenever deciding to actually deprecate; once this ability
935          * is removed, we will not be able to open any images with legacy-styled
936          * backing image strings. */
937         warn_report("RBD options encoded in the filename as keyvalue pairs "
938                     "is deprecated");
939     }
940 
941     /* Remove the processed options from the QDict (the visitor processes
942      * _all_ options in the QDict) */
943     while ((e = qdict_first(options))) {
944         qdict_del(options, e->key);
945     }
946 
947     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
948                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
949     if (r < 0) {
950         goto out;
951     }
952 
953     s->snap = g_strdup(opts->snapshot);
954     s->image_name = g_strdup(opts->image);
955 
956     /* rbd_open is always r/w */
957     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
958     if (r < 0) {
959         error_setg_errno(errp, -r, "error reading header from %s",
960                          s->image_name);
961         goto failed_open;
962     }
963 
964     if (opts->has_encrypt) {
965 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
966         r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
967         if (r < 0) {
968             goto failed_post_open;
969         }
970 #else
971         r = -ENOTSUP;
972         error_setg(errp, "RBD library does not support image encryption");
973         goto failed_post_open;
974 #endif
975     }
976 
977     r = rbd_stat(s->image, &info, sizeof(info));
978     if (r < 0) {
979         error_setg_errno(errp, -r, "error getting image info from %s",
980                          s->image_name);
981         goto failed_post_open;
982     }
983     s->image_size = info.size;
984     s->object_size = info.obj_size;
985 
986     /* If we are using an rbd snapshot, we must be r/o, otherwise
987      * leave as-is */
988     if (s->snap != NULL) {
989         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
990         if (r < 0) {
991             goto failed_post_open;
992         }
993     }
994 
995 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
996     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
997 #endif
998 
999     /* When extending regular files, we get zeros from the OS */
1000     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1001 
1002     r = 0;
1003     goto out;
1004 
1005 failed_post_open:
1006     rbd_close(s->image);
1007 failed_open:
1008     rados_ioctx_destroy(s->io_ctx);
1009     g_free(s->snap);
1010     g_free(s->image_name);
1011     rados_shutdown(s->cluster);
1012 out:
1013     qapi_free_BlockdevOptionsRbd(opts);
1014     g_free(keypairs);
1015     g_free(secretid);
1016     return r;
1017 }
1018 
1019 
1020 /* Since RBD is currently always opened R/W via the API,
1021  * we just need to check if we are using a snapshot or not, in
1022  * order to determine if we will allow it to be R/W */
1023 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1024                                    BlockReopenQueue *queue, Error **errp)
1025 {
1026     BDRVRBDState *s = state->bs->opaque;
1027     int ret = 0;
1028 
1029     if (s->snap && state->flags & BDRV_O_RDWR) {
1030         error_setg(errp,
1031                    "Cannot change node '%s' to r/w when using RBD snapshot",
1032                    bdrv_get_device_or_node_name(state->bs));
1033         ret = -EINVAL;
1034     }
1035 
1036     return ret;
1037 }
1038 
1039 static void qemu_rbd_close(BlockDriverState *bs)
1040 {
1041     BDRVRBDState *s = bs->opaque;
1042 
1043     rbd_close(s->image);
1044     rados_ioctx_destroy(s->io_ctx);
1045     g_free(s->snap);
1046     g_free(s->image_name);
1047     rados_shutdown(s->cluster);
1048 }
1049 
1050 /* Resize the RBD image and update the 'image_size' with the current size */
1051 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1052 {
1053     BDRVRBDState *s = bs->opaque;
1054     int r;
1055 
1056     r = rbd_resize(s->image, size);
1057     if (r < 0) {
1058         return r;
1059     }
1060 
1061     s->image_size = size;
1062 
1063     return 0;
1064 }
1065 
1066 static void qemu_rbd_finish_bh(void *opaque)
1067 {
1068     RBDTask *task = opaque;
1069     task->complete = true;
1070     aio_co_wake(task->co);
1071 }
1072 
1073 /*
1074  * This is the completion callback function for all rbd aio calls
1075  * started from qemu_rbd_start_co().
1076  *
1077  * Note: this function is being called from a non qemu thread so
1078  * we need to be careful about what we do here. Generally we only
1079  * schedule a BH, and do the rest of the io completion handling
1080  * from qemu_rbd_finish_bh() which runs in a qemu context.
1081  */
1082 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1083 {
1084     task->ret = rbd_aio_get_return_value(c);
1085     rbd_aio_release(c);
1086     aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1087                             qemu_rbd_finish_bh, task);
1088 }
1089 
1090 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1091                                           uint64_t offset,
1092                                           uint64_t bytes,
1093                                           QEMUIOVector *qiov,
1094                                           int flags,
1095                                           RBDAIOCmd cmd)
1096 {
1097     BDRVRBDState *s = bs->opaque;
1098     RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1099     rbd_completion_t c;
1100     int r;
1101 
1102     assert(!qiov || qiov->size == bytes);
1103 
1104     r = rbd_aio_create_completion(&task,
1105                                   (rbd_callback_t) qemu_rbd_completion_cb, &c);
1106     if (r < 0) {
1107         return r;
1108     }
1109 
1110     switch (cmd) {
1111     case RBD_AIO_READ:
1112         r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1113         break;
1114     case RBD_AIO_WRITE:
1115         r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1116         break;
1117     case RBD_AIO_DISCARD:
1118         r = rbd_aio_discard(s->image, offset, bytes, c);
1119         break;
1120     case RBD_AIO_FLUSH:
1121         r = rbd_aio_flush(s->image, c);
1122         break;
1123 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1124     case RBD_AIO_WRITE_ZEROES: {
1125         int zero_flags = 0;
1126 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1127         if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1128             zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1129         }
1130 #endif
1131         r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1132         break;
1133     }
1134 #endif
1135     default:
1136         r = -EINVAL;
1137     }
1138 
1139     if (r < 0) {
1140         error_report("rbd request failed early: cmd %d offset %" PRIu64
1141                      " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1142                      bytes, flags, r, strerror(-r));
1143         rbd_aio_release(c);
1144         return r;
1145     }
1146 
1147     while (!task.complete) {
1148         qemu_coroutine_yield();
1149     }
1150 
1151     if (task.ret < 0) {
1152         error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1153                      PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1154                      bytes, flags, task.ret, strerror(-task.ret));
1155         return task.ret;
1156     }
1157 
1158     /* zero pad short reads */
1159     if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1160         qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1161     }
1162 
1163     return 0;
1164 }
1165 
1166 static int
1167 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1168                                 int64_t bytes, QEMUIOVector *qiov,
1169                                 BdrvRequestFlags flags)
1170 {
1171     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1172 }
1173 
1174 static int
1175 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1176                                  int64_t bytes, QEMUIOVector *qiov,
1177                                  BdrvRequestFlags flags)
1178 {
1179     BDRVRBDState *s = bs->opaque;
1180     /*
1181      * RBD APIs don't allow us to write more than actual size, so in order
1182      * to support growing images, we resize the image before write
1183      * operations that exceed the current size.
1184      */
1185     if (offset + bytes > s->image_size) {
1186         int r = qemu_rbd_resize(bs, offset + bytes);
1187         if (r < 0) {
1188             return r;
1189         }
1190     }
1191     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1192 }
1193 
1194 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1195 {
1196     return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1197 }
1198 
1199 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1200                                              int64_t offset, int64_t bytes)
1201 {
1202     return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1203 }
1204 
1205 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1206 static int
1207 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1208                                        int64_t bytes, BdrvRequestFlags flags)
1209 {
1210     return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1211                              RBD_AIO_WRITE_ZEROES);
1212 }
1213 #endif
1214 
1215 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1216 {
1217     BDRVRBDState *s = bs->opaque;
1218     bdi->cluster_size = s->object_size;
1219     return 0;
1220 }
1221 
1222 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1223                                                      Error **errp)
1224 {
1225     BDRVRBDState *s = bs->opaque;
1226     ImageInfoSpecific *spec_info;
1227     char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1228     int r;
1229 
1230     if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1231         r = rbd_read(s->image, 0,
1232                      RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1233         if (r < 0) {
1234             error_setg_errno(errp, -r, "cannot read image start for probe");
1235             return NULL;
1236         }
1237     }
1238 
1239     spec_info = g_new(ImageInfoSpecific, 1);
1240     *spec_info = (ImageInfoSpecific){
1241         .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1242         .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1243     };
1244 
1245     if (memcmp(buf, rbd_luks_header_verification,
1246                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1247         spec_info->u.rbd.data->encryption_format =
1248                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1249         spec_info->u.rbd.data->has_encryption_format = true;
1250     } else if (memcmp(buf, rbd_luks2_header_verification,
1251                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1252         spec_info->u.rbd.data->encryption_format =
1253                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1254         spec_info->u.rbd.data->has_encryption_format = true;
1255     } else {
1256         spec_info->u.rbd.data->has_encryption_format = false;
1257     }
1258 
1259     return spec_info;
1260 }
1261 
1262 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1263 {
1264     BDRVRBDState *s = bs->opaque;
1265     int r;
1266 
1267     r = rbd_get_size(s->image, &s->image_size);
1268     if (r < 0) {
1269         return r;
1270     }
1271 
1272     return s->image_size;
1273 }
1274 
1275 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1276                                              int64_t offset,
1277                                              bool exact,
1278                                              PreallocMode prealloc,
1279                                              BdrvRequestFlags flags,
1280                                              Error **errp)
1281 {
1282     int r;
1283 
1284     if (prealloc != PREALLOC_MODE_OFF) {
1285         error_setg(errp, "Unsupported preallocation mode '%s'",
1286                    PreallocMode_str(prealloc));
1287         return -ENOTSUP;
1288     }
1289 
1290     r = qemu_rbd_resize(bs, offset);
1291     if (r < 0) {
1292         error_setg_errno(errp, -r, "Failed to resize file");
1293         return r;
1294     }
1295 
1296     return 0;
1297 }
1298 
1299 static int qemu_rbd_snap_create(BlockDriverState *bs,
1300                                 QEMUSnapshotInfo *sn_info)
1301 {
1302     BDRVRBDState *s = bs->opaque;
1303     int r;
1304 
1305     if (sn_info->name[0] == '\0') {
1306         return -EINVAL; /* we need a name for rbd snapshots */
1307     }
1308 
1309     /*
1310      * rbd snapshots are using the name as the user controlled unique identifier
1311      * we can't use the rbd snapid for that purpose, as it can't be set
1312      */
1313     if (sn_info->id_str[0] != '\0' &&
1314         strcmp(sn_info->id_str, sn_info->name) != 0) {
1315         return -EINVAL;
1316     }
1317 
1318     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1319         return -ERANGE;
1320     }
1321 
1322     r = rbd_snap_create(s->image, sn_info->name);
1323     if (r < 0) {
1324         error_report("failed to create snap: %s", strerror(-r));
1325         return r;
1326     }
1327 
1328     return 0;
1329 }
1330 
1331 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1332                                 const char *snapshot_id,
1333                                 const char *snapshot_name,
1334                                 Error **errp)
1335 {
1336     BDRVRBDState *s = bs->opaque;
1337     int r;
1338 
1339     if (!snapshot_name) {
1340         error_setg(errp, "rbd need a valid snapshot name");
1341         return -EINVAL;
1342     }
1343 
1344     /* If snapshot_id is specified, it must be equal to name, see
1345        qemu_rbd_snap_list() */
1346     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1347         error_setg(errp,
1348                    "rbd do not support snapshot id, it should be NULL or "
1349                    "equal to snapshot name");
1350         return -EINVAL;
1351     }
1352 
1353     r = rbd_snap_remove(s->image, snapshot_name);
1354     if (r < 0) {
1355         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1356     }
1357     return r;
1358 }
1359 
1360 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1361                                   const char *snapshot_name)
1362 {
1363     BDRVRBDState *s = bs->opaque;
1364 
1365     return rbd_snap_rollback(s->image, snapshot_name);
1366 }
1367 
1368 static int qemu_rbd_snap_list(BlockDriverState *bs,
1369                               QEMUSnapshotInfo **psn_tab)
1370 {
1371     BDRVRBDState *s = bs->opaque;
1372     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1373     int i, snap_count;
1374     rbd_snap_info_t *snaps;
1375     int max_snaps = RBD_MAX_SNAPS;
1376 
1377     do {
1378         snaps = g_new(rbd_snap_info_t, max_snaps);
1379         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1380         if (snap_count <= 0) {
1381             g_free(snaps);
1382         }
1383     } while (snap_count == -ERANGE);
1384 
1385     if (snap_count <= 0) {
1386         goto done;
1387     }
1388 
1389     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1390 
1391     for (i = 0; i < snap_count; i++) {
1392         const char *snap_name = snaps[i].name;
1393 
1394         sn_info = sn_tab + i;
1395         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1396         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1397 
1398         sn_info->vm_state_size = snaps[i].size;
1399         sn_info->date_sec = 0;
1400         sn_info->date_nsec = 0;
1401         sn_info->vm_clock_nsec = 0;
1402     }
1403     rbd_snap_list_end(snaps);
1404     g_free(snaps);
1405 
1406  done:
1407     *psn_tab = sn_tab;
1408     return snap_count;
1409 }
1410 
1411 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1412                                                       Error **errp)
1413 {
1414     BDRVRBDState *s = bs->opaque;
1415     int r = rbd_invalidate_cache(s->image);
1416     if (r < 0) {
1417         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1418     }
1419 }
1420 
1421 static QemuOptsList qemu_rbd_create_opts = {
1422     .name = "rbd-create-opts",
1423     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1424     .desc = {
1425         {
1426             .name = BLOCK_OPT_SIZE,
1427             .type = QEMU_OPT_SIZE,
1428             .help = "Virtual disk size"
1429         },
1430         {
1431             .name = BLOCK_OPT_CLUSTER_SIZE,
1432             .type = QEMU_OPT_SIZE,
1433             .help = "RBD object size"
1434         },
1435         {
1436             .name = "password-secret",
1437             .type = QEMU_OPT_STRING,
1438             .help = "ID of secret providing the password",
1439         },
1440         {
1441             .name = "encrypt.format",
1442             .type = QEMU_OPT_STRING,
1443             .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1444         },
1445         {
1446             .name = "encrypt.cipher-alg",
1447             .type = QEMU_OPT_STRING,
1448             .help = "Name of encryption cipher algorithm"
1449                     " (allowed values: aes-128, aes-256)",
1450         },
1451         {
1452             .name = "encrypt.key-secret",
1453             .type = QEMU_OPT_STRING,
1454             .help = "ID of secret providing LUKS passphrase",
1455         },
1456         { /* end of list */ }
1457     }
1458 };
1459 
1460 static const char *const qemu_rbd_strong_runtime_opts[] = {
1461     "pool",
1462     "namespace",
1463     "image",
1464     "conf",
1465     "snapshot",
1466     "user",
1467     "server.",
1468     "password-secret",
1469 
1470     NULL
1471 };
1472 
1473 static BlockDriver bdrv_rbd = {
1474     .format_name            = "rbd",
1475     .instance_size          = sizeof(BDRVRBDState),
1476     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1477     .bdrv_file_open         = qemu_rbd_open,
1478     .bdrv_close             = qemu_rbd_close,
1479     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1480     .bdrv_co_create         = qemu_rbd_co_create,
1481     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1482     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1483     .bdrv_get_info          = qemu_rbd_getinfo,
1484     .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1485     .create_opts            = &qemu_rbd_create_opts,
1486     .bdrv_getlength         = qemu_rbd_getlength,
1487     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1488     .protocol_name          = "rbd",
1489 
1490     .bdrv_co_preadv         = qemu_rbd_co_preadv,
1491     .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1492     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1493     .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1494 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1495     .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1496 #endif
1497 
1498     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1499     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1500     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1501     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1502     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1503 
1504     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1505 };
1506 
1507 static void bdrv_rbd_init(void)
1508 {
1509     bdrv_register(&bdrv_rbd);
1510 }
1511 
1512 block_init(bdrv_rbd_init);
1513