xref: /qemu/block/nbd.c (revision e3bcf57c)
1 /*
2  * QEMU Block driver for  NBD
3  *
4  * Copyright (c) 2019 Virtuozzo International GmbH.
5  * Copyright (C) 2016 Red Hat, Inc.
6  * Copyright (C) 2008 Bull S.A.S.
7  *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
8  *
9  * Some parts:
10  *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
11  *
12  * Permission is hereby granted, free of charge, to any person obtaining a copy
13  * of this software and associated documentation files (the "Software"), to deal
14  * in the Software without restriction, including without limitation the rights
15  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16  * copies of the Software, and to permit persons to whom the Software is
17  * furnished to do so, subject to the following conditions:
18  *
19  * The above copyright notice and this permission notice shall be included in
20  * all copies or substantial portions of the Software.
21  *
22  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
25  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
28  * THE SOFTWARE.
29  */
30 
31 #include "qemu/osdep.h"
32 
33 #include "trace.h"
34 #include "qemu/uri.h"
35 #include "qemu/option.h"
36 #include "qemu/cutils.h"
37 #include "qemu/main-loop.h"
38 #include "qemu/atomic.h"
39 
40 #include "qapi/qapi-visit-sockets.h"
41 #include "qapi/qmp/qstring.h"
42 #include "qapi/clone-visitor.h"
43 
44 #include "block/qdict.h"
45 #include "block/nbd.h"
46 #include "block/block_int.h"
47 #include "block/coroutines.h"
48 
49 #include "qemu/yank.h"
50 
51 #define EN_OPTSTR ":exportname="
52 #define MAX_NBD_REQUESTS    16
53 
54 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
55 #define INDEX_TO_HANDLE(bs, index)  ((index)  ^ (uint64_t)(intptr_t)(bs))
56 
57 typedef struct {
58     Coroutine *coroutine;
59     uint64_t offset;        /* original offset of the request */
60     bool receiving;         /* waiting for connection_co? */
61 } NBDClientRequest;
62 
63 typedef enum NBDClientState {
64     NBD_CLIENT_CONNECTING_WAIT,
65     NBD_CLIENT_CONNECTING_NOWAIT,
66     NBD_CLIENT_CONNECTED,
67     NBD_CLIENT_QUIT
68 } NBDClientState;
69 
70 typedef struct BDRVNBDState {
71     QIOChannel *ioc; /* The current I/O channel */
72     NBDExportInfo info;
73 
74     CoMutex send_mutex;
75     CoQueue free_sema;
76     Coroutine *connection_co;
77     Coroutine *teardown_co;
78     QemuCoSleep reconnect_sleep;
79     bool drained;
80     bool wait_drained_end;
81     int in_flight;
82     NBDClientState state;
83     bool wait_in_flight;
84 
85     QEMUTimer *reconnect_delay_timer;
86 
87     NBDClientRequest requests[MAX_NBD_REQUESTS];
88     NBDReply reply;
89     BlockDriverState *bs;
90 
91     /* Connection parameters */
92     uint32_t reconnect_delay;
93     SocketAddress *saddr;
94     char *export, *tlscredsid;
95     QCryptoTLSCreds *tlscreds;
96     const char *hostname;
97     char *x_dirty_bitmap;
98     bool alloc_depth;
99 
100     NBDClientConnection *conn;
101 } BDRVNBDState;
102 
103 static void nbd_yank(void *opaque);
104 
105 static void nbd_clear_bdrvstate(BlockDriverState *bs)
106 {
107     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
108 
109     nbd_client_connection_release(s->conn);
110     s->conn = NULL;
111 
112     yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
113 
114     object_unref(OBJECT(s->tlscreds));
115     qapi_free_SocketAddress(s->saddr);
116     s->saddr = NULL;
117     g_free(s->export);
118     s->export = NULL;
119     g_free(s->tlscredsid);
120     s->tlscredsid = NULL;
121     g_free(s->x_dirty_bitmap);
122     s->x_dirty_bitmap = NULL;
123 }
124 
125 static bool nbd_client_connected(BDRVNBDState *s)
126 {
127     return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
128 }
129 
130 static void nbd_channel_error(BDRVNBDState *s, int ret)
131 {
132     if (ret == -EIO) {
133         if (nbd_client_connected(s)) {
134             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
135                                             NBD_CLIENT_CONNECTING_NOWAIT;
136         }
137     } else {
138         if (nbd_client_connected(s)) {
139             qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
140         }
141         s->state = NBD_CLIENT_QUIT;
142     }
143 }
144 
145 static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
146 {
147     int i;
148 
149     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
150         NBDClientRequest *req = &s->requests[i];
151 
152         if (req->coroutine && req->receiving) {
153             req->receiving = false;
154             aio_co_wake(req->coroutine);
155         }
156     }
157 }
158 
159 static void reconnect_delay_timer_del(BDRVNBDState *s)
160 {
161     if (s->reconnect_delay_timer) {
162         timer_free(s->reconnect_delay_timer);
163         s->reconnect_delay_timer = NULL;
164     }
165 }
166 
167 static void reconnect_delay_timer_cb(void *opaque)
168 {
169     BDRVNBDState *s = opaque;
170 
171     if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
172         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
173         while (qemu_co_enter_next(&s->free_sema, NULL)) {
174             /* Resume all queued requests */
175         }
176     }
177 
178     reconnect_delay_timer_del(s);
179 }
180 
181 static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
182 {
183     if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
184         return;
185     }
186 
187     assert(!s->reconnect_delay_timer);
188     s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
189                                              QEMU_CLOCK_REALTIME,
190                                              SCALE_NS,
191                                              reconnect_delay_timer_cb, s);
192     timer_mod(s->reconnect_delay_timer, expire_time_ns);
193 }
194 
195 static void nbd_client_detach_aio_context(BlockDriverState *bs)
196 {
197     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
198 
199     /* Timer is deleted in nbd_client_co_drain_begin() */
200     assert(!s->reconnect_delay_timer);
201     /*
202      * If reconnect is in progress we may have no ->ioc.  It will be
203      * re-instantiated in the proper aio context once the connection is
204      * reestablished.
205      */
206     if (s->ioc) {
207         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
208     }
209 }
210 
211 static void nbd_client_attach_aio_context_bh(void *opaque)
212 {
213     BlockDriverState *bs = opaque;
214     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
215 
216     if (s->connection_co) {
217         /*
218          * The node is still drained, so we know the coroutine has yielded in
219          * nbd_read_eof(), the only place where bs->in_flight can reach 0, or
220          * it is entered for the first time. Both places are safe for entering
221          * the coroutine.
222          */
223         qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
224     }
225     bdrv_dec_in_flight(bs);
226 }
227 
228 static void nbd_client_attach_aio_context(BlockDriverState *bs,
229                                           AioContext *new_context)
230 {
231     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
232 
233     /*
234      * s->connection_co is either yielded from nbd_receive_reply or from
235      * nbd_co_reconnect_loop()
236      */
237     if (nbd_client_connected(s)) {
238         qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
239     }
240 
241     bdrv_inc_in_flight(bs);
242 
243     /*
244      * Need to wait here for the BH to run because the BH must run while the
245      * node is still drained.
246      */
247     aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
248 }
249 
250 static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
251 {
252     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
253 
254     s->drained = true;
255     qemu_co_sleep_wake(&s->reconnect_sleep);
256 
257     nbd_co_establish_connection_cancel(s->conn);
258 
259     reconnect_delay_timer_del(s);
260 
261     if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
262         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
263         qemu_co_queue_restart_all(&s->free_sema);
264     }
265 }
266 
267 static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
268 {
269     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
270 
271     s->drained = false;
272     if (s->wait_drained_end) {
273         s->wait_drained_end = false;
274         aio_co_wake(s->connection_co);
275     }
276 }
277 
278 
279 static void nbd_teardown_connection(BlockDriverState *bs)
280 {
281     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
282 
283     if (s->ioc) {
284         /* finish any pending coroutines */
285         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
286     }
287 
288     s->state = NBD_CLIENT_QUIT;
289     if (s->connection_co) {
290         qemu_co_sleep_wake(&s->reconnect_sleep);
291         nbd_co_establish_connection_cancel(s->conn);
292     }
293     if (qemu_in_coroutine()) {
294         s->teardown_co = qemu_coroutine_self();
295         /* connection_co resumes us when it terminates */
296         qemu_coroutine_yield();
297         s->teardown_co = NULL;
298     } else {
299         BDRV_POLL_WHILE(bs, s->connection_co);
300     }
301     assert(!s->connection_co);
302 }
303 
304 static bool nbd_client_connecting(BDRVNBDState *s)
305 {
306     NBDClientState state = qatomic_load_acquire(&s->state);
307     return state == NBD_CLIENT_CONNECTING_WAIT ||
308         state == NBD_CLIENT_CONNECTING_NOWAIT;
309 }
310 
311 static bool nbd_client_connecting_wait(BDRVNBDState *s)
312 {
313     return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
314 }
315 
316 /*
317  * Update @bs with information learned during a completed negotiation process.
318  * Return failure if the server's advertised options are incompatible with the
319  * client's needs.
320  */
321 static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
322 {
323     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
324     int ret;
325 
326     if (s->x_dirty_bitmap) {
327         if (!s->info.base_allocation) {
328             error_setg(errp, "requested x-dirty-bitmap %s not found",
329                        s->x_dirty_bitmap);
330             return -EINVAL;
331         }
332         if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
333             s->alloc_depth = true;
334         }
335     }
336 
337     if (s->info.flags & NBD_FLAG_READ_ONLY) {
338         ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
339         if (ret < 0) {
340             return ret;
341         }
342     }
343 
344     if (s->info.flags & NBD_FLAG_SEND_FUA) {
345         bs->supported_write_flags = BDRV_REQ_FUA;
346         bs->supported_zero_flags |= BDRV_REQ_FUA;
347     }
348 
349     if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
350         bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
351         if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
352             bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
353         }
354     }
355 
356     trace_nbd_client_handshake_success(s->export);
357 
358     return 0;
359 }
360 
361 int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
362                                                 Error **errp)
363 {
364     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
365     int ret;
366 
367     assert(!s->ioc);
368 
369     s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
370     if (!s->ioc) {
371         return -ECONNREFUSED;
372     }
373 
374     ret = nbd_handle_updated_info(s->bs, NULL);
375     if (ret < 0) {
376         /*
377          * We have connected, but must fail for other reasons.
378          * Send NBD_CMD_DISC as a courtesy to the server.
379          */
380         NBDRequest request = { .type = NBD_CMD_DISC };
381 
382         nbd_send_request(s->ioc, &request);
383 
384         object_unref(OBJECT(s->ioc));
385         s->ioc = NULL;
386 
387         return ret;
388     }
389 
390     qio_channel_set_blocking(s->ioc, false, NULL);
391     qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
392 
393     yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
394                            bs);
395 
396     /* successfully connected */
397     s->state = NBD_CLIENT_CONNECTED;
398     qemu_co_queue_restart_all(&s->free_sema);
399 
400     return 0;
401 }
402 
403 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
404 {
405     if (!nbd_client_connecting(s)) {
406         return;
407     }
408 
409     /* Wait for completion of all in-flight requests */
410 
411     qemu_co_mutex_lock(&s->send_mutex);
412 
413     while (s->in_flight > 0) {
414         qemu_co_mutex_unlock(&s->send_mutex);
415         nbd_recv_coroutines_wake_all(s);
416         s->wait_in_flight = true;
417         qemu_coroutine_yield();
418         s->wait_in_flight = false;
419         qemu_co_mutex_lock(&s->send_mutex);
420     }
421 
422     qemu_co_mutex_unlock(&s->send_mutex);
423 
424     if (!nbd_client_connecting(s)) {
425         return;
426     }
427 
428     /*
429      * Now we are sure that nobody is accessing the channel, and no one will
430      * try until we set the state to CONNECTED.
431      */
432 
433     /* Finalize previous connection if any */
434     if (s->ioc) {
435         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
436         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
437                                  nbd_yank, s->bs);
438         object_unref(OBJECT(s->ioc));
439         s->ioc = NULL;
440     }
441 
442     nbd_co_do_establish_connection(s->bs, NULL);
443 }
444 
445 static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
446 {
447     uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
448     uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
449 
450     if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
451         reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
452                                    s->reconnect_delay * NANOSECONDS_PER_SECOND);
453     }
454 
455     nbd_reconnect_attempt(s);
456 
457     while (nbd_client_connecting(s)) {
458         if (s->drained) {
459             bdrv_dec_in_flight(s->bs);
460             s->wait_drained_end = true;
461             while (s->drained) {
462                 /*
463                  * We may be entered once from nbd_client_attach_aio_context_bh
464                  * and then from nbd_client_co_drain_end. So here is a loop.
465                  */
466                 qemu_coroutine_yield();
467             }
468             bdrv_inc_in_flight(s->bs);
469         } else {
470             qemu_co_sleep_ns_wakeable(&s->reconnect_sleep,
471                                       QEMU_CLOCK_REALTIME, timeout);
472             if (s->drained) {
473                 continue;
474             }
475             if (timeout < max_timeout) {
476                 timeout *= 2;
477             }
478         }
479 
480         nbd_reconnect_attempt(s);
481     }
482 
483     reconnect_delay_timer_del(s);
484 }
485 
486 static coroutine_fn void nbd_connection_entry(void *opaque)
487 {
488     BDRVNBDState *s = opaque;
489     uint64_t i;
490     int ret = 0;
491     Error *local_err = NULL;
492 
493     while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
494         /*
495          * The NBD client can only really be considered idle when it has
496          * yielded from qio_channel_readv_all_eof(), waiting for data. This is
497          * the point where the additional scheduled coroutine entry happens
498          * after nbd_client_attach_aio_context().
499          *
500          * Therefore we keep an additional in_flight reference all the time and
501          * only drop it temporarily here.
502          */
503 
504         if (nbd_client_connecting(s)) {
505             nbd_co_reconnect_loop(s);
506         }
507 
508         if (!nbd_client_connected(s)) {
509             continue;
510         }
511 
512         assert(s->reply.handle == 0);
513         ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
514 
515         if (local_err) {
516             trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
517             error_free(local_err);
518             local_err = NULL;
519         }
520         if (ret <= 0) {
521             nbd_channel_error(s, ret ? ret : -EIO);
522             continue;
523         }
524 
525         /*
526          * There's no need for a mutex on the receive side, because the
527          * handler acts as a synchronization point and ensures that only
528          * one coroutine is called until the reply finishes.
529          */
530         i = HANDLE_TO_INDEX(s, s->reply.handle);
531         if (i >= MAX_NBD_REQUESTS ||
532             !s->requests[i].coroutine ||
533             !s->requests[i].receiving ||
534             (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
535         {
536             nbd_channel_error(s, -EINVAL);
537             continue;
538         }
539 
540         /*
541          * We're woken up again by the request itself.  Note that there
542          * is no race between yielding and reentering connection_co.  This
543          * is because:
544          *
545          * - if the request runs on the same AioContext, it is only
546          *   entered after we yield
547          *
548          * - if the request runs on a different AioContext, reentering
549          *   connection_co happens through a bottom half, which can only
550          *   run after we yield.
551          */
552         s->requests[i].receiving = false;
553         aio_co_wake(s->requests[i].coroutine);
554         qemu_coroutine_yield();
555     }
556 
557     qemu_co_queue_restart_all(&s->free_sema);
558     nbd_recv_coroutines_wake_all(s);
559     bdrv_dec_in_flight(s->bs);
560 
561     s->connection_co = NULL;
562     if (s->ioc) {
563         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
564         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
565                                  nbd_yank, s->bs);
566         object_unref(OBJECT(s->ioc));
567         s->ioc = NULL;
568     }
569 
570     if (s->teardown_co) {
571         aio_co_wake(s->teardown_co);
572     }
573     aio_wait_kick();
574 }
575 
576 static int nbd_co_send_request(BlockDriverState *bs,
577                                NBDRequest *request,
578                                QEMUIOVector *qiov)
579 {
580     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
581     int rc, i = -1;
582 
583     qemu_co_mutex_lock(&s->send_mutex);
584     while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
585         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
586     }
587 
588     if (!nbd_client_connected(s)) {
589         rc = -EIO;
590         goto err;
591     }
592 
593     s->in_flight++;
594 
595     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
596         if (s->requests[i].coroutine == NULL) {
597             break;
598         }
599     }
600 
601     g_assert(qemu_in_coroutine());
602     assert(i < MAX_NBD_REQUESTS);
603 
604     s->requests[i].coroutine = qemu_coroutine_self();
605     s->requests[i].offset = request->from;
606     s->requests[i].receiving = false;
607 
608     request->handle = INDEX_TO_HANDLE(s, i);
609 
610     assert(s->ioc);
611 
612     if (qiov) {
613         qio_channel_set_cork(s->ioc, true);
614         rc = nbd_send_request(s->ioc, request);
615         if (nbd_client_connected(s) && rc >= 0) {
616             if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
617                                        NULL) < 0) {
618                 rc = -EIO;
619             }
620         } else if (rc >= 0) {
621             rc = -EIO;
622         }
623         qio_channel_set_cork(s->ioc, false);
624     } else {
625         rc = nbd_send_request(s->ioc, request);
626     }
627 
628 err:
629     if (rc < 0) {
630         nbd_channel_error(s, rc);
631         if (i != -1) {
632             s->requests[i].coroutine = NULL;
633             s->in_flight--;
634         }
635         if (s->in_flight == 0 && s->wait_in_flight) {
636             aio_co_wake(s->connection_co);
637         } else {
638             qemu_co_queue_next(&s->free_sema);
639         }
640     }
641     qemu_co_mutex_unlock(&s->send_mutex);
642     return rc;
643 }
644 
645 static inline uint16_t payload_advance16(uint8_t **payload)
646 {
647     *payload += 2;
648     return lduw_be_p(*payload - 2);
649 }
650 
651 static inline uint32_t payload_advance32(uint8_t **payload)
652 {
653     *payload += 4;
654     return ldl_be_p(*payload - 4);
655 }
656 
657 static inline uint64_t payload_advance64(uint8_t **payload)
658 {
659     *payload += 8;
660     return ldq_be_p(*payload - 8);
661 }
662 
663 static int nbd_parse_offset_hole_payload(BDRVNBDState *s,
664                                          NBDStructuredReplyChunk *chunk,
665                                          uint8_t *payload, uint64_t orig_offset,
666                                          QEMUIOVector *qiov, Error **errp)
667 {
668     uint64_t offset;
669     uint32_t hole_size;
670 
671     if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
672         error_setg(errp, "Protocol error: invalid payload for "
673                          "NBD_REPLY_TYPE_OFFSET_HOLE");
674         return -EINVAL;
675     }
676 
677     offset = payload_advance64(&payload);
678     hole_size = payload_advance32(&payload);
679 
680     if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
681         offset > orig_offset + qiov->size - hole_size) {
682         error_setg(errp, "Protocol error: server sent chunk exceeding requested"
683                          " region");
684         return -EINVAL;
685     }
686     if (s->info.min_block &&
687         !QEMU_IS_ALIGNED(hole_size, s->info.min_block)) {
688         trace_nbd_structured_read_compliance("hole");
689     }
690 
691     qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
692 
693     return 0;
694 }
695 
696 /*
697  * nbd_parse_blockstatus_payload
698  * Based on our request, we expect only one extent in reply, for the
699  * base:allocation context.
700  */
701 static int nbd_parse_blockstatus_payload(BDRVNBDState *s,
702                                          NBDStructuredReplyChunk *chunk,
703                                          uint8_t *payload, uint64_t orig_length,
704                                          NBDExtent *extent, Error **errp)
705 {
706     uint32_t context_id;
707 
708     /* The server succeeded, so it must have sent [at least] one extent */
709     if (chunk->length < sizeof(context_id) + sizeof(*extent)) {
710         error_setg(errp, "Protocol error: invalid payload for "
711                          "NBD_REPLY_TYPE_BLOCK_STATUS");
712         return -EINVAL;
713     }
714 
715     context_id = payload_advance32(&payload);
716     if (s->info.context_id != context_id) {
717         error_setg(errp, "Protocol error: unexpected context id %d for "
718                          "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
719                          "id is %d", context_id,
720                          s->info.context_id);
721         return -EINVAL;
722     }
723 
724     extent->length = payload_advance32(&payload);
725     extent->flags = payload_advance32(&payload);
726 
727     if (extent->length == 0) {
728         error_setg(errp, "Protocol error: server sent status chunk with "
729                    "zero length");
730         return -EINVAL;
731     }
732 
733     /*
734      * A server sending unaligned block status is in violation of the
735      * protocol, but as qemu-nbd 3.1 is such a server (at least for
736      * POSIX files that are not a multiple of 512 bytes, since qemu
737      * rounds files up to 512-byte multiples but lseek(SEEK_HOLE)
738      * still sees an implicit hole beyond the real EOF), it's nicer to
739      * work around the misbehaving server. If the request included
740      * more than the final unaligned block, truncate it back to an
741      * aligned result; if the request was only the final block, round
742      * up to the full block and change the status to fully-allocated
743      * (always a safe status, even if it loses information).
744      */
745     if (s->info.min_block && !QEMU_IS_ALIGNED(extent->length,
746                                                    s->info.min_block)) {
747         trace_nbd_parse_blockstatus_compliance("extent length is unaligned");
748         if (extent->length > s->info.min_block) {
749             extent->length = QEMU_ALIGN_DOWN(extent->length,
750                                              s->info.min_block);
751         } else {
752             extent->length = s->info.min_block;
753             extent->flags = 0;
754         }
755     }
756 
757     /*
758      * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
759      * sent us any more than one extent, nor should it have included
760      * status beyond our request in that extent. However, it's easy
761      * enough to ignore the server's noncompliance without killing the
762      * connection; just ignore trailing extents, and clamp things to
763      * the length of our request.
764      */
765     if (chunk->length > sizeof(context_id) + sizeof(*extent)) {
766         trace_nbd_parse_blockstatus_compliance("more than one extent");
767     }
768     if (extent->length > orig_length) {
769         extent->length = orig_length;
770         trace_nbd_parse_blockstatus_compliance("extent length too large");
771     }
772 
773     /*
774      * HACK: if we are using x-dirty-bitmaps to access
775      * qemu:allocation-depth, treat all depths > 2 the same as 2,
776      * since nbd_client_co_block_status is only expecting the low two
777      * bits to be set.
778      */
779     if (s->alloc_depth && extent->flags > 2) {
780         extent->flags = 2;
781     }
782 
783     return 0;
784 }
785 
786 /*
787  * nbd_parse_error_payload
788  * on success @errp contains message describing nbd error reply
789  */
790 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
791                                    uint8_t *payload, int *request_ret,
792                                    Error **errp)
793 {
794     uint32_t error;
795     uint16_t message_size;
796 
797     assert(chunk->type & (1 << 15));
798 
799     if (chunk->length < sizeof(error) + sizeof(message_size)) {
800         error_setg(errp,
801                    "Protocol error: invalid payload for structured error");
802         return -EINVAL;
803     }
804 
805     error = nbd_errno_to_system_errno(payload_advance32(&payload));
806     if (error == 0) {
807         error_setg(errp, "Protocol error: server sent structured error chunk "
808                          "with error = 0");
809         return -EINVAL;
810     }
811 
812     *request_ret = -error;
813     message_size = payload_advance16(&payload);
814 
815     if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
816         error_setg(errp, "Protocol error: server sent structured error chunk "
817                          "with incorrect message size");
818         return -EINVAL;
819     }
820 
821     /* TODO: Add a trace point to mention the server complaint */
822 
823     /* TODO handle ERROR_OFFSET */
824 
825     return 0;
826 }
827 
828 static int nbd_co_receive_offset_data_payload(BDRVNBDState *s,
829                                               uint64_t orig_offset,
830                                               QEMUIOVector *qiov, Error **errp)
831 {
832     QEMUIOVector sub_qiov;
833     uint64_t offset;
834     size_t data_size;
835     int ret;
836     NBDStructuredReplyChunk *chunk = &s->reply.structured;
837 
838     assert(nbd_reply_is_structured(&s->reply));
839 
840     /* The NBD spec requires at least one byte of payload */
841     if (chunk->length <= sizeof(offset)) {
842         error_setg(errp, "Protocol error: invalid payload for "
843                          "NBD_REPLY_TYPE_OFFSET_DATA");
844         return -EINVAL;
845     }
846 
847     if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
848         return -EIO;
849     }
850 
851     data_size = chunk->length - sizeof(offset);
852     assert(data_size);
853     if (offset < orig_offset || data_size > qiov->size ||
854         offset > orig_offset + qiov->size - data_size) {
855         error_setg(errp, "Protocol error: server sent chunk exceeding requested"
856                          " region");
857         return -EINVAL;
858     }
859     if (s->info.min_block && !QEMU_IS_ALIGNED(data_size, s->info.min_block)) {
860         trace_nbd_structured_read_compliance("data");
861     }
862 
863     qemu_iovec_init(&sub_qiov, qiov->niov);
864     qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
865     ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
866     qemu_iovec_destroy(&sub_qiov);
867 
868     return ret < 0 ? -EIO : 0;
869 }
870 
871 #define NBD_MAX_MALLOC_PAYLOAD 1000
872 static coroutine_fn int nbd_co_receive_structured_payload(
873         BDRVNBDState *s, void **payload, Error **errp)
874 {
875     int ret;
876     uint32_t len;
877 
878     assert(nbd_reply_is_structured(&s->reply));
879 
880     len = s->reply.structured.length;
881 
882     if (len == 0) {
883         return 0;
884     }
885 
886     if (payload == NULL) {
887         error_setg(errp, "Unexpected structured payload");
888         return -EINVAL;
889     }
890 
891     if (len > NBD_MAX_MALLOC_PAYLOAD) {
892         error_setg(errp, "Payload too large");
893         return -EINVAL;
894     }
895 
896     *payload = g_new(char, len);
897     ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
898     if (ret < 0) {
899         g_free(*payload);
900         *payload = NULL;
901         return ret;
902     }
903 
904     return 0;
905 }
906 
907 /*
908  * nbd_co_do_receive_one_chunk
909  * for simple reply:
910  *   set request_ret to received reply error
911  *   if qiov is not NULL: read payload to @qiov
912  * for structured reply chunk:
913  *   if error chunk: read payload, set @request_ret, do not set @payload
914  *   else if offset_data chunk: read payload data to @qiov, do not set @payload
915  *   else: read payload to @payload
916  *
917  * If function fails, @errp contains corresponding error message, and the
918  * connection with the server is suspect.  If it returns 0, then the
919  * transaction succeeded (although @request_ret may be a negative errno
920  * corresponding to the server's error reply), and errp is unchanged.
921  */
922 static coroutine_fn int nbd_co_do_receive_one_chunk(
923         BDRVNBDState *s, uint64_t handle, bool only_structured,
924         int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
925 {
926     int ret;
927     int i = HANDLE_TO_INDEX(s, handle);
928     void *local_payload = NULL;
929     NBDStructuredReplyChunk *chunk;
930 
931     if (payload) {
932         *payload = NULL;
933     }
934     *request_ret = 0;
935 
936     /* Wait until we're woken up by nbd_connection_entry.  */
937     s->requests[i].receiving = true;
938     qemu_coroutine_yield();
939     assert(!s->requests[i].receiving);
940     if (!nbd_client_connected(s)) {
941         error_setg(errp, "Connection closed");
942         return -EIO;
943     }
944     assert(s->ioc);
945 
946     assert(s->reply.handle == handle);
947 
948     if (nbd_reply_is_simple(&s->reply)) {
949         if (only_structured) {
950             error_setg(errp, "Protocol error: simple reply when structured "
951                              "reply chunk was expected");
952             return -EINVAL;
953         }
954 
955         *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
956         if (*request_ret < 0 || !qiov) {
957             return 0;
958         }
959 
960         return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
961                                      errp) < 0 ? -EIO : 0;
962     }
963 
964     /* handle structured reply chunk */
965     assert(s->info.structured_reply);
966     chunk = &s->reply.structured;
967 
968     if (chunk->type == NBD_REPLY_TYPE_NONE) {
969         if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
970             error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
971                        " NBD_REPLY_FLAG_DONE flag set");
972             return -EINVAL;
973         }
974         if (chunk->length) {
975             error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
976                        " nonzero length");
977             return -EINVAL;
978         }
979         return 0;
980     }
981 
982     if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
983         if (!qiov) {
984             error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
985             return -EINVAL;
986         }
987 
988         return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
989                                                   qiov, errp);
990     }
991 
992     if (nbd_reply_type_is_error(chunk->type)) {
993         payload = &local_payload;
994     }
995 
996     ret = nbd_co_receive_structured_payload(s, payload, errp);
997     if (ret < 0) {
998         return ret;
999     }
1000 
1001     if (nbd_reply_type_is_error(chunk->type)) {
1002         ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
1003         g_free(local_payload);
1004         return ret;
1005     }
1006 
1007     return 0;
1008 }
1009 
1010 /*
1011  * nbd_co_receive_one_chunk
1012  * Read reply, wake up connection_co and set s->quit if needed.
1013  * Return value is a fatal error code or normal nbd reply error code
1014  */
1015 static coroutine_fn int nbd_co_receive_one_chunk(
1016         BDRVNBDState *s, uint64_t handle, bool only_structured,
1017         int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
1018         Error **errp)
1019 {
1020     int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
1021                                           request_ret, qiov, payload, errp);
1022 
1023     if (ret < 0) {
1024         memset(reply, 0, sizeof(*reply));
1025         nbd_channel_error(s, ret);
1026     } else {
1027         /* For assert at loop start in nbd_connection_entry */
1028         *reply = s->reply;
1029     }
1030     s->reply.handle = 0;
1031 
1032     if (s->connection_co && !s->wait_in_flight) {
1033         /*
1034          * We must check s->wait_in_flight, because we may entered by
1035          * nbd_recv_coroutines_wake_all(), in this case we should not
1036          * wake connection_co here, it will woken by last request.
1037          */
1038         aio_co_wake(s->connection_co);
1039     }
1040 
1041     return ret;
1042 }
1043 
1044 typedef struct NBDReplyChunkIter {
1045     int ret;
1046     int request_ret;
1047     Error *err;
1048     bool done, only_structured;
1049 } NBDReplyChunkIter;
1050 
1051 static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
1052                                    int ret, Error **local_err)
1053 {
1054     assert(local_err && *local_err);
1055     assert(ret < 0);
1056 
1057     if (!iter->ret) {
1058         iter->ret = ret;
1059         error_propagate(&iter->err, *local_err);
1060     } else {
1061         error_free(*local_err);
1062     }
1063 
1064     *local_err = NULL;
1065 }
1066 
1067 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
1068 {
1069     assert(ret < 0);
1070 
1071     if (!iter->request_ret) {
1072         iter->request_ret = ret;
1073     }
1074 }
1075 
1076 /*
1077  * NBD_FOREACH_REPLY_CHUNK
1078  * The pointer stored in @payload requires g_free() to free it.
1079  */
1080 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
1081                                 qiov, reply, payload) \
1082     for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
1083          nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
1084 
1085 /*
1086  * nbd_reply_chunk_iter_receive
1087  * The pointer stored in @payload requires g_free() to free it.
1088  */
1089 static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
1090                                          NBDReplyChunkIter *iter,
1091                                          uint64_t handle,
1092                                          QEMUIOVector *qiov, NBDReply *reply,
1093                                          void **payload)
1094 {
1095     int ret, request_ret;
1096     NBDReply local_reply;
1097     NBDStructuredReplyChunk *chunk;
1098     Error *local_err = NULL;
1099     if (!nbd_client_connected(s)) {
1100         error_setg(&local_err, "Connection closed");
1101         nbd_iter_channel_error(iter, -EIO, &local_err);
1102         goto break_loop;
1103     }
1104 
1105     if (iter->done) {
1106         /* Previous iteration was last. */
1107         goto break_loop;
1108     }
1109 
1110     if (reply == NULL) {
1111         reply = &local_reply;
1112     }
1113 
1114     ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
1115                                    &request_ret, qiov, reply, payload,
1116                                    &local_err);
1117     if (ret < 0) {
1118         nbd_iter_channel_error(iter, ret, &local_err);
1119     } else if (request_ret < 0) {
1120         nbd_iter_request_error(iter, request_ret);
1121     }
1122 
1123     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
1124     if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
1125         goto break_loop;
1126     }
1127 
1128     chunk = &reply->structured;
1129     iter->only_structured = true;
1130 
1131     if (chunk->type == NBD_REPLY_TYPE_NONE) {
1132         /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
1133         assert(chunk->flags & NBD_REPLY_FLAG_DONE);
1134         goto break_loop;
1135     }
1136 
1137     if (chunk->flags & NBD_REPLY_FLAG_DONE) {
1138         /* This iteration is last. */
1139         iter->done = true;
1140     }
1141 
1142     /* Execute the loop body */
1143     return true;
1144 
1145 break_loop:
1146     s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
1147 
1148     qemu_co_mutex_lock(&s->send_mutex);
1149     s->in_flight--;
1150     if (s->in_flight == 0 && s->wait_in_flight) {
1151         aio_co_wake(s->connection_co);
1152     } else {
1153         qemu_co_queue_next(&s->free_sema);
1154     }
1155     qemu_co_mutex_unlock(&s->send_mutex);
1156 
1157     return false;
1158 }
1159 
1160 static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
1161                                       int *request_ret, Error **errp)
1162 {
1163     NBDReplyChunkIter iter;
1164 
1165     NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
1166         /* nbd_reply_chunk_iter_receive does all the work */
1167     }
1168 
1169     error_propagate(errp, iter.err);
1170     *request_ret = iter.request_ret;
1171     return iter.ret;
1172 }
1173 
1174 static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
1175                                         uint64_t offset, QEMUIOVector *qiov,
1176                                         int *request_ret, Error **errp)
1177 {
1178     NBDReplyChunkIter iter;
1179     NBDReply reply;
1180     void *payload = NULL;
1181     Error *local_err = NULL;
1182 
1183     NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
1184                             qiov, &reply, &payload)
1185     {
1186         int ret;
1187         NBDStructuredReplyChunk *chunk = &reply.structured;
1188 
1189         assert(nbd_reply_is_structured(&reply));
1190 
1191         switch (chunk->type) {
1192         case NBD_REPLY_TYPE_OFFSET_DATA:
1193             /*
1194              * special cased in nbd_co_receive_one_chunk, data is already
1195              * in qiov
1196              */
1197             break;
1198         case NBD_REPLY_TYPE_OFFSET_HOLE:
1199             ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload,
1200                                                 offset, qiov, &local_err);
1201             if (ret < 0) {
1202                 nbd_channel_error(s, ret);
1203                 nbd_iter_channel_error(&iter, ret, &local_err);
1204             }
1205             break;
1206         default:
1207             if (!nbd_reply_type_is_error(chunk->type)) {
1208                 /* not allowed reply type */
1209                 nbd_channel_error(s, -EINVAL);
1210                 error_setg(&local_err,
1211                            "Unexpected reply type: %d (%s) for CMD_READ",
1212                            chunk->type, nbd_reply_type_lookup(chunk->type));
1213                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1214             }
1215         }
1216 
1217         g_free(payload);
1218         payload = NULL;
1219     }
1220 
1221     error_propagate(errp, iter.err);
1222     *request_ret = iter.request_ret;
1223     return iter.ret;
1224 }
1225 
1226 static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
1227                                             uint64_t handle, uint64_t length,
1228                                             NBDExtent *extent,
1229                                             int *request_ret, Error **errp)
1230 {
1231     NBDReplyChunkIter iter;
1232     NBDReply reply;
1233     void *payload = NULL;
1234     Error *local_err = NULL;
1235     bool received = false;
1236 
1237     assert(!extent->length);
1238     NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, &reply, &payload) {
1239         int ret;
1240         NBDStructuredReplyChunk *chunk = &reply.structured;
1241 
1242         assert(nbd_reply_is_structured(&reply));
1243 
1244         switch (chunk->type) {
1245         case NBD_REPLY_TYPE_BLOCK_STATUS:
1246             if (received) {
1247                 nbd_channel_error(s, -EINVAL);
1248                 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
1249                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1250             }
1251             received = true;
1252 
1253             ret = nbd_parse_blockstatus_payload(s, &reply.structured,
1254                                                 payload, length, extent,
1255                                                 &local_err);
1256             if (ret < 0) {
1257                 nbd_channel_error(s, ret);
1258                 nbd_iter_channel_error(&iter, ret, &local_err);
1259             }
1260             break;
1261         default:
1262             if (!nbd_reply_type_is_error(chunk->type)) {
1263                 nbd_channel_error(s, -EINVAL);
1264                 error_setg(&local_err,
1265                            "Unexpected reply type: %d (%s) "
1266                            "for CMD_BLOCK_STATUS",
1267                            chunk->type, nbd_reply_type_lookup(chunk->type));
1268                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1269             }
1270         }
1271 
1272         g_free(payload);
1273         payload = NULL;
1274     }
1275 
1276     if (!extent->length && !iter.request_ret) {
1277         error_setg(&local_err, "Server did not reply with any status extents");
1278         nbd_iter_channel_error(&iter, -EIO, &local_err);
1279     }
1280 
1281     error_propagate(errp, iter.err);
1282     *request_ret = iter.request_ret;
1283     return iter.ret;
1284 }
1285 
1286 static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
1287                           QEMUIOVector *write_qiov)
1288 {
1289     int ret, request_ret;
1290     Error *local_err = NULL;
1291     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1292 
1293     assert(request->type != NBD_CMD_READ);
1294     if (write_qiov) {
1295         assert(request->type == NBD_CMD_WRITE);
1296         assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
1297     } else {
1298         assert(request->type != NBD_CMD_WRITE);
1299     }
1300 
1301     do {
1302         ret = nbd_co_send_request(bs, request, write_qiov);
1303         if (ret < 0) {
1304             continue;
1305         }
1306 
1307         ret = nbd_co_receive_return_code(s, request->handle,
1308                                          &request_ret, &local_err);
1309         if (local_err) {
1310             trace_nbd_co_request_fail(request->from, request->len,
1311                                       request->handle, request->flags,
1312                                       request->type,
1313                                       nbd_cmd_lookup(request->type),
1314                                       ret, error_get_pretty(local_err));
1315             error_free(local_err);
1316             local_err = NULL;
1317         }
1318     } while (ret < 0 && nbd_client_connecting_wait(s));
1319 
1320     return ret ? ret : request_ret;
1321 }
1322 
1323 static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
1324                                 uint64_t bytes, QEMUIOVector *qiov, int flags)
1325 {
1326     int ret, request_ret;
1327     Error *local_err = NULL;
1328     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1329     NBDRequest request = {
1330         .type = NBD_CMD_READ,
1331         .from = offset,
1332         .len = bytes,
1333     };
1334 
1335     assert(bytes <= NBD_MAX_BUFFER_SIZE);
1336     assert(!flags);
1337 
1338     if (!bytes) {
1339         return 0;
1340     }
1341     /*
1342      * Work around the fact that the block layer doesn't do
1343      * byte-accurate sizing yet - if the read exceeds the server's
1344      * advertised size because the block layer rounded size up, then
1345      * truncate the request to the server and tail-pad with zero.
1346      */
1347     if (offset >= s->info.size) {
1348         assert(bytes < BDRV_SECTOR_SIZE);
1349         qemu_iovec_memset(qiov, 0, 0, bytes);
1350         return 0;
1351     }
1352     if (offset + bytes > s->info.size) {
1353         uint64_t slop = offset + bytes - s->info.size;
1354 
1355         assert(slop < BDRV_SECTOR_SIZE);
1356         qemu_iovec_memset(qiov, bytes - slop, 0, slop);
1357         request.len -= slop;
1358     }
1359 
1360     do {
1361         ret = nbd_co_send_request(bs, &request, NULL);
1362         if (ret < 0) {
1363             continue;
1364         }
1365 
1366         ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
1367                                            &request_ret, &local_err);
1368         if (local_err) {
1369             trace_nbd_co_request_fail(request.from, request.len, request.handle,
1370                                       request.flags, request.type,
1371                                       nbd_cmd_lookup(request.type),
1372                                       ret, error_get_pretty(local_err));
1373             error_free(local_err);
1374             local_err = NULL;
1375         }
1376     } while (ret < 0 && nbd_client_connecting_wait(s));
1377 
1378     return ret ? ret : request_ret;
1379 }
1380 
1381 static int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
1382                                  uint64_t bytes, QEMUIOVector *qiov, int flags)
1383 {
1384     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1385     NBDRequest request = {
1386         .type = NBD_CMD_WRITE,
1387         .from = offset,
1388         .len = bytes,
1389     };
1390 
1391     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1392     if (flags & BDRV_REQ_FUA) {
1393         assert(s->info.flags & NBD_FLAG_SEND_FUA);
1394         request.flags |= NBD_CMD_FLAG_FUA;
1395     }
1396 
1397     assert(bytes <= NBD_MAX_BUFFER_SIZE);
1398 
1399     if (!bytes) {
1400         return 0;
1401     }
1402     return nbd_co_request(bs, &request, qiov);
1403 }
1404 
1405 static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1406                                        int bytes, BdrvRequestFlags flags)
1407 {
1408     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1409     NBDRequest request = {
1410         .type = NBD_CMD_WRITE_ZEROES,
1411         .from = offset,
1412         .len = bytes,
1413     };
1414 
1415     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1416     if (!(s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
1417         return -ENOTSUP;
1418     }
1419 
1420     if (flags & BDRV_REQ_FUA) {
1421         assert(s->info.flags & NBD_FLAG_SEND_FUA);
1422         request.flags |= NBD_CMD_FLAG_FUA;
1423     }
1424     if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1425         request.flags |= NBD_CMD_FLAG_NO_HOLE;
1426     }
1427     if (flags & BDRV_REQ_NO_FALLBACK) {
1428         assert(s->info.flags & NBD_FLAG_SEND_FAST_ZERO);
1429         request.flags |= NBD_CMD_FLAG_FAST_ZERO;
1430     }
1431 
1432     if (!bytes) {
1433         return 0;
1434     }
1435     return nbd_co_request(bs, &request, NULL);
1436 }
1437 
1438 static int nbd_client_co_flush(BlockDriverState *bs)
1439 {
1440     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1441     NBDRequest request = { .type = NBD_CMD_FLUSH };
1442 
1443     if (!(s->info.flags & NBD_FLAG_SEND_FLUSH)) {
1444         return 0;
1445     }
1446 
1447     request.from = 0;
1448     request.len = 0;
1449 
1450     return nbd_co_request(bs, &request, NULL);
1451 }
1452 
1453 static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
1454                                   int bytes)
1455 {
1456     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1457     NBDRequest request = {
1458         .type = NBD_CMD_TRIM,
1459         .from = offset,
1460         .len = bytes,
1461     };
1462 
1463     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1464     if (!(s->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
1465         return 0;
1466     }
1467 
1468     return nbd_co_request(bs, &request, NULL);
1469 }
1470 
1471 static int coroutine_fn nbd_client_co_block_status(
1472         BlockDriverState *bs, bool want_zero, int64_t offset, int64_t bytes,
1473         int64_t *pnum, int64_t *map, BlockDriverState **file)
1474 {
1475     int ret, request_ret;
1476     NBDExtent extent = { 0 };
1477     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1478     Error *local_err = NULL;
1479 
1480     NBDRequest request = {
1481         .type = NBD_CMD_BLOCK_STATUS,
1482         .from = offset,
1483         .len = MIN(QEMU_ALIGN_DOWN(INT_MAX, bs->bl.request_alignment),
1484                    MIN(bytes, s->info.size - offset)),
1485         .flags = NBD_CMD_FLAG_REQ_ONE,
1486     };
1487 
1488     if (!s->info.base_allocation) {
1489         *pnum = bytes;
1490         *map = offset;
1491         *file = bs;
1492         return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1493     }
1494 
1495     /*
1496      * Work around the fact that the block layer doesn't do
1497      * byte-accurate sizing yet - if the status request exceeds the
1498      * server's advertised size because the block layer rounded size
1499      * up, we truncated the request to the server (above), or are
1500      * called on just the hole.
1501      */
1502     if (offset >= s->info.size) {
1503         *pnum = bytes;
1504         assert(bytes < BDRV_SECTOR_SIZE);
1505         /* Intentionally don't report offset_valid for the hole */
1506         return BDRV_BLOCK_ZERO;
1507     }
1508 
1509     if (s->info.min_block) {
1510         assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
1511     }
1512     do {
1513         ret = nbd_co_send_request(bs, &request, NULL);
1514         if (ret < 0) {
1515             continue;
1516         }
1517 
1518         ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
1519                                                &extent, &request_ret,
1520                                                &local_err);
1521         if (local_err) {
1522             trace_nbd_co_request_fail(request.from, request.len, request.handle,
1523                                       request.flags, request.type,
1524                                       nbd_cmd_lookup(request.type),
1525                                       ret, error_get_pretty(local_err));
1526             error_free(local_err);
1527             local_err = NULL;
1528         }
1529     } while (ret < 0 && nbd_client_connecting_wait(s));
1530 
1531     if (ret < 0 || request_ret < 0) {
1532         return ret ? ret : request_ret;
1533     }
1534 
1535     assert(extent.length);
1536     *pnum = extent.length;
1537     *map = offset;
1538     *file = bs;
1539     return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
1540         (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0) |
1541         BDRV_BLOCK_OFFSET_VALID;
1542 }
1543 
1544 static int nbd_client_reopen_prepare(BDRVReopenState *state,
1545                                      BlockReopenQueue *queue, Error **errp)
1546 {
1547     BDRVNBDState *s = (BDRVNBDState *)state->bs->opaque;
1548 
1549     if ((state->flags & BDRV_O_RDWR) && (s->info.flags & NBD_FLAG_READ_ONLY)) {
1550         error_setg(errp, "Can't reopen read-only NBD mount as read/write");
1551         return -EACCES;
1552     }
1553     return 0;
1554 }
1555 
1556 static void nbd_yank(void *opaque)
1557 {
1558     BlockDriverState *bs = opaque;
1559     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1560 
1561     qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
1562     qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1563 }
1564 
1565 static void nbd_client_close(BlockDriverState *bs)
1566 {
1567     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1568     NBDRequest request = { .type = NBD_CMD_DISC };
1569 
1570     if (s->ioc) {
1571         nbd_send_request(s->ioc, &request);
1572     }
1573 
1574     nbd_teardown_connection(bs);
1575 }
1576 
1577 
1578 /*
1579  * Parse nbd_open options
1580  */
1581 
1582 static int nbd_parse_uri(const char *filename, QDict *options)
1583 {
1584     URI *uri;
1585     const char *p;
1586     QueryParams *qp = NULL;
1587     int ret = 0;
1588     bool is_unix;
1589 
1590     uri = uri_parse(filename);
1591     if (!uri) {
1592         return -EINVAL;
1593     }
1594 
1595     /* transport */
1596     if (!g_strcmp0(uri->scheme, "nbd")) {
1597         is_unix = false;
1598     } else if (!g_strcmp0(uri->scheme, "nbd+tcp")) {
1599         is_unix = false;
1600     } else if (!g_strcmp0(uri->scheme, "nbd+unix")) {
1601         is_unix = true;
1602     } else {
1603         ret = -EINVAL;
1604         goto out;
1605     }
1606 
1607     p = uri->path ? uri->path : "";
1608     if (p[0] == '/') {
1609         p++;
1610     }
1611     if (p[0]) {
1612         qdict_put_str(options, "export", p);
1613     }
1614 
1615     qp = query_params_parse(uri->query);
1616     if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
1617         ret = -EINVAL;
1618         goto out;
1619     }
1620 
1621     if (is_unix) {
1622         /* nbd+unix:///export?socket=path */
1623         if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
1624             ret = -EINVAL;
1625             goto out;
1626         }
1627         qdict_put_str(options, "server.type", "unix");
1628         qdict_put_str(options, "server.path", qp->p[0].value);
1629     } else {
1630         QString *host;
1631         char *port_str;
1632 
1633         /* nbd[+tcp]://host[:port]/export */
1634         if (!uri->server) {
1635             ret = -EINVAL;
1636             goto out;
1637         }
1638 
1639         /* strip braces from literal IPv6 address */
1640         if (uri->server[0] == '[') {
1641             host = qstring_from_substr(uri->server, 1,
1642                                        strlen(uri->server) - 1);
1643         } else {
1644             host = qstring_from_str(uri->server);
1645         }
1646 
1647         qdict_put_str(options, "server.type", "inet");
1648         qdict_put(options, "server.host", host);
1649 
1650         port_str = g_strdup_printf("%d", uri->port ?: NBD_DEFAULT_PORT);
1651         qdict_put_str(options, "server.port", port_str);
1652         g_free(port_str);
1653     }
1654 
1655 out:
1656     if (qp) {
1657         query_params_free(qp);
1658     }
1659     uri_free(uri);
1660     return ret;
1661 }
1662 
1663 static bool nbd_has_filename_options_conflict(QDict *options, Error **errp)
1664 {
1665     const QDictEntry *e;
1666 
1667     for (e = qdict_first(options); e; e = qdict_next(options, e)) {
1668         if (!strcmp(e->key, "host") ||
1669             !strcmp(e->key, "port") ||
1670             !strcmp(e->key, "path") ||
1671             !strcmp(e->key, "export") ||
1672             strstart(e->key, "server.", NULL))
1673         {
1674             error_setg(errp, "Option '%s' cannot be used with a file name",
1675                        e->key);
1676             return true;
1677         }
1678     }
1679 
1680     return false;
1681 }
1682 
1683 static void nbd_parse_filename(const char *filename, QDict *options,
1684                                Error **errp)
1685 {
1686     g_autofree char *file = NULL;
1687     char *export_name;
1688     const char *host_spec;
1689     const char *unixpath;
1690 
1691     if (nbd_has_filename_options_conflict(options, errp)) {
1692         return;
1693     }
1694 
1695     if (strstr(filename, "://")) {
1696         int ret = nbd_parse_uri(filename, options);
1697         if (ret < 0) {
1698             error_setg(errp, "No valid URL specified");
1699         }
1700         return;
1701     }
1702 
1703     file = g_strdup(filename);
1704 
1705     export_name = strstr(file, EN_OPTSTR);
1706     if (export_name) {
1707         if (export_name[strlen(EN_OPTSTR)] == 0) {
1708             return;
1709         }
1710         export_name[0] = 0; /* truncate 'file' */
1711         export_name += strlen(EN_OPTSTR);
1712 
1713         qdict_put_str(options, "export", export_name);
1714     }
1715 
1716     /* extract the host_spec - fail if it's not nbd:... */
1717     if (!strstart(file, "nbd:", &host_spec)) {
1718         error_setg(errp, "File name string for NBD must start with 'nbd:'");
1719         return;
1720     }
1721 
1722     if (!*host_spec) {
1723         return;
1724     }
1725 
1726     /* are we a UNIX or TCP socket? */
1727     if (strstart(host_spec, "unix:", &unixpath)) {
1728         qdict_put_str(options, "server.type", "unix");
1729         qdict_put_str(options, "server.path", unixpath);
1730     } else {
1731         InetSocketAddress *addr = g_new(InetSocketAddress, 1);
1732 
1733         if (inet_parse(addr, host_spec, errp)) {
1734             goto out_inet;
1735         }
1736 
1737         qdict_put_str(options, "server.type", "inet");
1738         qdict_put_str(options, "server.host", addr->host);
1739         qdict_put_str(options, "server.port", addr->port);
1740     out_inet:
1741         qapi_free_InetSocketAddress(addr);
1742     }
1743 }
1744 
1745 static bool nbd_process_legacy_socket_options(QDict *output_options,
1746                                               QemuOpts *legacy_opts,
1747                                               Error **errp)
1748 {
1749     const char *path = qemu_opt_get(legacy_opts, "path");
1750     const char *host = qemu_opt_get(legacy_opts, "host");
1751     const char *port = qemu_opt_get(legacy_opts, "port");
1752     const QDictEntry *e;
1753 
1754     if (!path && !host && !port) {
1755         return true;
1756     }
1757 
1758     for (e = qdict_first(output_options); e; e = qdict_next(output_options, e))
1759     {
1760         if (strstart(e->key, "server.", NULL)) {
1761             error_setg(errp, "Cannot use 'server' and path/host/port at the "
1762                        "same time");
1763             return false;
1764         }
1765     }
1766 
1767     if (path && host) {
1768         error_setg(errp, "path and host may not be used at the same time");
1769         return false;
1770     } else if (path) {
1771         if (port) {
1772             error_setg(errp, "port may not be used without host");
1773             return false;
1774         }
1775 
1776         qdict_put_str(output_options, "server.type", "unix");
1777         qdict_put_str(output_options, "server.path", path);
1778     } else if (host) {
1779         qdict_put_str(output_options, "server.type", "inet");
1780         qdict_put_str(output_options, "server.host", host);
1781         qdict_put_str(output_options, "server.port",
1782                       port ?: stringify(NBD_DEFAULT_PORT));
1783     }
1784 
1785     return true;
1786 }
1787 
1788 static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
1789                                  Error **errp)
1790 {
1791     SocketAddress *saddr = NULL;
1792     QDict *addr = NULL;
1793     Visitor *iv = NULL;
1794 
1795     qdict_extract_subqdict(options, &addr, "server.");
1796     if (!qdict_size(addr)) {
1797         error_setg(errp, "NBD server address missing");
1798         goto done;
1799     }
1800 
1801     iv = qobject_input_visitor_new_flat_confused(addr, errp);
1802     if (!iv) {
1803         goto done;
1804     }
1805 
1806     if (!visit_type_SocketAddress(iv, NULL, &saddr, errp)) {
1807         goto done;
1808     }
1809 
1810     if (socket_address_parse_named_fd(saddr, errp) < 0) {
1811         qapi_free_SocketAddress(saddr);
1812         saddr = NULL;
1813         goto done;
1814     }
1815 
1816 done:
1817     qobject_unref(addr);
1818     visit_free(iv);
1819     return saddr;
1820 }
1821 
1822 static QCryptoTLSCreds *nbd_get_tls_creds(const char *id, Error **errp)
1823 {
1824     Object *obj;
1825     QCryptoTLSCreds *creds;
1826 
1827     obj = object_resolve_path_component(
1828         object_get_objects_root(), id);
1829     if (!obj) {
1830         error_setg(errp, "No TLS credentials with id '%s'",
1831                    id);
1832         return NULL;
1833     }
1834     creds = (QCryptoTLSCreds *)
1835         object_dynamic_cast(obj, TYPE_QCRYPTO_TLS_CREDS);
1836     if (!creds) {
1837         error_setg(errp, "Object with id '%s' is not TLS credentials",
1838                    id);
1839         return NULL;
1840     }
1841 
1842     if (!qcrypto_tls_creds_check_endpoint(creds,
1843                                           QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT,
1844                                           errp)) {
1845         return NULL;
1846     }
1847     object_ref(obj);
1848     return creds;
1849 }
1850 
1851 
1852 static QemuOptsList nbd_runtime_opts = {
1853     .name = "nbd",
1854     .head = QTAILQ_HEAD_INITIALIZER(nbd_runtime_opts.head),
1855     .desc = {
1856         {
1857             .name = "host",
1858             .type = QEMU_OPT_STRING,
1859             .help = "TCP host to connect to",
1860         },
1861         {
1862             .name = "port",
1863             .type = QEMU_OPT_STRING,
1864             .help = "TCP port to connect to",
1865         },
1866         {
1867             .name = "path",
1868             .type = QEMU_OPT_STRING,
1869             .help = "Unix socket path to connect to",
1870         },
1871         {
1872             .name = "export",
1873             .type = QEMU_OPT_STRING,
1874             .help = "Name of the NBD export to open",
1875         },
1876         {
1877             .name = "tls-creds",
1878             .type = QEMU_OPT_STRING,
1879             .help = "ID of the TLS credentials to use",
1880         },
1881         {
1882             .name = "x-dirty-bitmap",
1883             .type = QEMU_OPT_STRING,
1884             .help = "experimental: expose named dirty bitmap in place of "
1885                     "block status",
1886         },
1887         {
1888             .name = "reconnect-delay",
1889             .type = QEMU_OPT_NUMBER,
1890             .help = "On an unexpected disconnect, the nbd client tries to "
1891                     "connect again until succeeding or encountering a serious "
1892                     "error.  During the first @reconnect-delay seconds, all "
1893                     "requests are paused and will be rerun on a successful "
1894                     "reconnect. After that time, any delayed requests and all "
1895                     "future requests before a successful reconnect will "
1896                     "immediately fail. Default 0",
1897         },
1898         { /* end of list */ }
1899     },
1900 };
1901 
1902 static int nbd_process_options(BlockDriverState *bs, QDict *options,
1903                                Error **errp)
1904 {
1905     BDRVNBDState *s = bs->opaque;
1906     QemuOpts *opts;
1907     int ret = -EINVAL;
1908 
1909     opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort);
1910     if (!qemu_opts_absorb_qdict(opts, options, errp)) {
1911         goto error;
1912     }
1913 
1914     /* Translate @host, @port, and @path to a SocketAddress */
1915     if (!nbd_process_legacy_socket_options(options, opts, errp)) {
1916         goto error;
1917     }
1918 
1919     /* Pop the config into our state object. Exit if invalid. */
1920     s->saddr = nbd_config(s, options, errp);
1921     if (!s->saddr) {
1922         goto error;
1923     }
1924 
1925     s->export = g_strdup(qemu_opt_get(opts, "export"));
1926     if (s->export && strlen(s->export) > NBD_MAX_STRING_SIZE) {
1927         error_setg(errp, "export name too long to send to server");
1928         goto error;
1929     }
1930 
1931     s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds"));
1932     if (s->tlscredsid) {
1933         s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
1934         if (!s->tlscreds) {
1935             goto error;
1936         }
1937 
1938         /* TODO SOCKET_ADDRESS_KIND_FD where fd has AF_INET or AF_INET6 */
1939         if (s->saddr->type != SOCKET_ADDRESS_TYPE_INET) {
1940             error_setg(errp, "TLS only supported over IP sockets");
1941             goto error;
1942         }
1943         s->hostname = s->saddr->u.inet.host;
1944     }
1945 
1946     s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
1947     if (s->x_dirty_bitmap && strlen(s->x_dirty_bitmap) > NBD_MAX_STRING_SIZE) {
1948         error_setg(errp, "x-dirty-bitmap query too long to send to server");
1949         goto error;
1950     }
1951 
1952     s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
1953 
1954     ret = 0;
1955 
1956  error:
1957     qemu_opts_del(opts);
1958     return ret;
1959 }
1960 
1961 static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
1962                     Error **errp)
1963 {
1964     int ret;
1965     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1966 
1967     s->bs = bs;
1968     qemu_co_mutex_init(&s->send_mutex);
1969     qemu_co_queue_init(&s->free_sema);
1970 
1971     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
1972         return -EEXIST;
1973     }
1974 
1975     ret = nbd_process_options(bs, options, errp);
1976     if (ret < 0) {
1977         goto fail;
1978     }
1979 
1980     s->conn = nbd_client_connection_new(s->saddr, true, s->export,
1981                                         s->x_dirty_bitmap, s->tlscreds);
1982 
1983     /* TODO: Configurable retry-until-timeout behaviour. */
1984     ret = nbd_do_establish_connection(bs, errp);
1985     if (ret < 0) {
1986         goto fail;
1987     }
1988 
1989     s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
1990     bdrv_inc_in_flight(bs);
1991     aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
1992 
1993     return 0;
1994 
1995 fail:
1996     nbd_clear_bdrvstate(bs);
1997     return ret;
1998 }
1999 
2000 static int nbd_co_flush(BlockDriverState *bs)
2001 {
2002     return nbd_client_co_flush(bs);
2003 }
2004 
2005 static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
2006 {
2007     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
2008     uint32_t min = s->info.min_block;
2009     uint32_t max = MIN_NON_ZERO(NBD_MAX_BUFFER_SIZE, s->info.max_block);
2010 
2011     /*
2012      * If the server did not advertise an alignment:
2013      * - a size that is not sector-aligned implies that an alignment
2014      *   of 1 can be used to access those tail bytes
2015      * - advertisement of block status requires an alignment of 1, so
2016      *   that we don't violate block layer constraints that block
2017      *   status is always aligned (as we can't control whether the
2018      *   server will report sub-sector extents, such as a hole at EOF
2019      *   on an unaligned POSIX file)
2020      * - otherwise, assume the server is so old that we are safer avoiding
2021      *   sub-sector requests
2022      */
2023     if (!min) {
2024         min = (!QEMU_IS_ALIGNED(s->info.size, BDRV_SECTOR_SIZE) ||
2025                s->info.base_allocation) ? 1 : BDRV_SECTOR_SIZE;
2026     }
2027 
2028     bs->bl.request_alignment = min;
2029     bs->bl.max_pdiscard = QEMU_ALIGN_DOWN(INT_MAX, min);
2030     bs->bl.max_pwrite_zeroes = max;
2031     bs->bl.max_transfer = max;
2032 
2033     if (s->info.opt_block &&
2034         s->info.opt_block > bs->bl.opt_transfer) {
2035         bs->bl.opt_transfer = s->info.opt_block;
2036     }
2037 }
2038 
2039 static void nbd_close(BlockDriverState *bs)
2040 {
2041     nbd_client_close(bs);
2042     nbd_clear_bdrvstate(bs);
2043 }
2044 
2045 /*
2046  * NBD cannot truncate, but if the caller asks to truncate to the same size, or
2047  * to a smaller size with exact=false, there is no reason to fail the
2048  * operation.
2049  *
2050  * Preallocation mode is ignored since it does not seems useful to fail when
2051  * we never change anything.
2052  */
2053 static int coroutine_fn nbd_co_truncate(BlockDriverState *bs, int64_t offset,
2054                                         bool exact, PreallocMode prealloc,
2055                                         BdrvRequestFlags flags, Error **errp)
2056 {
2057     BDRVNBDState *s = bs->opaque;
2058 
2059     if (offset != s->info.size && exact) {
2060         error_setg(errp, "Cannot resize NBD nodes");
2061         return -ENOTSUP;
2062     }
2063 
2064     if (offset > s->info.size) {
2065         error_setg(errp, "Cannot grow NBD nodes");
2066         return -EINVAL;
2067     }
2068 
2069     return 0;
2070 }
2071 
2072 static int64_t nbd_getlength(BlockDriverState *bs)
2073 {
2074     BDRVNBDState *s = bs->opaque;
2075 
2076     return s->info.size;
2077 }
2078 
2079 static void nbd_refresh_filename(BlockDriverState *bs)
2080 {
2081     BDRVNBDState *s = bs->opaque;
2082     const char *host = NULL, *port = NULL, *path = NULL;
2083     size_t len = 0;
2084 
2085     if (s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
2086         const InetSocketAddress *inet = &s->saddr->u.inet;
2087         if (!inet->has_ipv4 && !inet->has_ipv6 && !inet->has_to) {
2088             host = inet->host;
2089             port = inet->port;
2090         }
2091     } else if (s->saddr->type == SOCKET_ADDRESS_TYPE_UNIX) {
2092         path = s->saddr->u.q_unix.path;
2093     } /* else can't represent as pseudo-filename */
2094 
2095     if (path && s->export) {
2096         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2097                        "nbd+unix:///%s?socket=%s", s->export, path);
2098     } else if (path && !s->export) {
2099         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2100                        "nbd+unix://?socket=%s", path);
2101     } else if (host && s->export) {
2102         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2103                        "nbd://%s:%s/%s", host, port, s->export);
2104     } else if (host && !s->export) {
2105         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2106                        "nbd://%s:%s", host, port);
2107     }
2108     if (len >= sizeof(bs->exact_filename)) {
2109         /* Name is too long to represent exactly, so leave it empty. */
2110         bs->exact_filename[0] = '\0';
2111     }
2112 }
2113 
2114 static char *nbd_dirname(BlockDriverState *bs, Error **errp)
2115 {
2116     /* The generic bdrv_dirname() implementation is able to work out some
2117      * directory name for NBD nodes, but that would be wrong. So far there is no
2118      * specification for how "export paths" would work, so NBD does not have
2119      * directory names. */
2120     error_setg(errp, "Cannot generate a base directory for NBD nodes");
2121     return NULL;
2122 }
2123 
2124 static const char *const nbd_strong_runtime_opts[] = {
2125     "path",
2126     "host",
2127     "port",
2128     "export",
2129     "tls-creds",
2130     "server.",
2131 
2132     NULL
2133 };
2134 
2135 static void nbd_cancel_in_flight(BlockDriverState *bs)
2136 {
2137     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
2138 
2139     reconnect_delay_timer_del(s);
2140 
2141     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
2142         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
2143         qemu_co_queue_restart_all(&s->free_sema);
2144     }
2145 }
2146 
2147 static BlockDriver bdrv_nbd = {
2148     .format_name                = "nbd",
2149     .protocol_name              = "nbd",
2150     .instance_size              = sizeof(BDRVNBDState),
2151     .bdrv_parse_filename        = nbd_parse_filename,
2152     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2153     .create_opts                = &bdrv_create_opts_simple,
2154     .bdrv_file_open             = nbd_open,
2155     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2156     .bdrv_co_preadv             = nbd_client_co_preadv,
2157     .bdrv_co_pwritev            = nbd_client_co_pwritev,
2158     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2159     .bdrv_close                 = nbd_close,
2160     .bdrv_co_flush_to_os        = nbd_co_flush,
2161     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2162     .bdrv_refresh_limits        = nbd_refresh_limits,
2163     .bdrv_co_truncate           = nbd_co_truncate,
2164     .bdrv_getlength             = nbd_getlength,
2165     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
2166     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
2167     .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
2168     .bdrv_co_drain_end          = nbd_client_co_drain_end,
2169     .bdrv_refresh_filename      = nbd_refresh_filename,
2170     .bdrv_co_block_status       = nbd_client_co_block_status,
2171     .bdrv_dirname               = nbd_dirname,
2172     .strong_runtime_opts        = nbd_strong_runtime_opts,
2173     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2174 };
2175 
2176 static BlockDriver bdrv_nbd_tcp = {
2177     .format_name                = "nbd",
2178     .protocol_name              = "nbd+tcp",
2179     .instance_size              = sizeof(BDRVNBDState),
2180     .bdrv_parse_filename        = nbd_parse_filename,
2181     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2182     .create_opts                = &bdrv_create_opts_simple,
2183     .bdrv_file_open             = nbd_open,
2184     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2185     .bdrv_co_preadv             = nbd_client_co_preadv,
2186     .bdrv_co_pwritev            = nbd_client_co_pwritev,
2187     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2188     .bdrv_close                 = nbd_close,
2189     .bdrv_co_flush_to_os        = nbd_co_flush,
2190     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2191     .bdrv_refresh_limits        = nbd_refresh_limits,
2192     .bdrv_co_truncate           = nbd_co_truncate,
2193     .bdrv_getlength             = nbd_getlength,
2194     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
2195     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
2196     .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
2197     .bdrv_co_drain_end          = nbd_client_co_drain_end,
2198     .bdrv_refresh_filename      = nbd_refresh_filename,
2199     .bdrv_co_block_status       = nbd_client_co_block_status,
2200     .bdrv_dirname               = nbd_dirname,
2201     .strong_runtime_opts        = nbd_strong_runtime_opts,
2202     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2203 };
2204 
2205 static BlockDriver bdrv_nbd_unix = {
2206     .format_name                = "nbd",
2207     .protocol_name              = "nbd+unix",
2208     .instance_size              = sizeof(BDRVNBDState),
2209     .bdrv_parse_filename        = nbd_parse_filename,
2210     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2211     .create_opts                = &bdrv_create_opts_simple,
2212     .bdrv_file_open             = nbd_open,
2213     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2214     .bdrv_co_preadv             = nbd_client_co_preadv,
2215     .bdrv_co_pwritev            = nbd_client_co_pwritev,
2216     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2217     .bdrv_close                 = nbd_close,
2218     .bdrv_co_flush_to_os        = nbd_co_flush,
2219     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2220     .bdrv_refresh_limits        = nbd_refresh_limits,
2221     .bdrv_co_truncate           = nbd_co_truncate,
2222     .bdrv_getlength             = nbd_getlength,
2223     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
2224     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
2225     .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
2226     .bdrv_co_drain_end          = nbd_client_co_drain_end,
2227     .bdrv_refresh_filename      = nbd_refresh_filename,
2228     .bdrv_co_block_status       = nbd_client_co_block_status,
2229     .bdrv_dirname               = nbd_dirname,
2230     .strong_runtime_opts        = nbd_strong_runtime_opts,
2231     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2232 };
2233 
2234 static void bdrv_nbd_init(void)
2235 {
2236     bdrv_register(&bdrv_nbd);
2237     bdrv_register(&bdrv_nbd_tcp);
2238     bdrv_register(&bdrv_nbd_unix);
2239 }
2240 
2241 block_init(bdrv_nbd_init);
2242