xref: /qemu/block/nbd.c (revision f9734d5d)
1 /*
2  * QEMU Block driver for  NBD
3  *
4  * Copyright (c) 2019 Virtuozzo International GmbH.
5  * Copyright (C) 2016 Red Hat, Inc.
6  * Copyright (C) 2008 Bull S.A.S.
7  *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
8  *
9  * Some parts:
10  *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
11  *
12  * Permission is hereby granted, free of charge, to any person obtaining a copy
13  * of this software and associated documentation files (the "Software"), to deal
14  * in the Software without restriction, including without limitation the rights
15  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16  * copies of the Software, and to permit persons to whom the Software is
17  * furnished to do so, subject to the following conditions:
18  *
19  * The above copyright notice and this permission notice shall be included in
20  * all copies or substantial portions of the Software.
21  *
22  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
25  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
28  * THE SOFTWARE.
29  */
30 
31 #include "qemu/osdep.h"
32 
33 #include "trace.h"
34 #include "qemu/uri.h"
35 #include "qemu/option.h"
36 #include "qemu/cutils.h"
37 #include "qemu/main-loop.h"
38 #include "qemu/atomic.h"
39 
40 #include "qapi/qapi-visit-sockets.h"
41 #include "qapi/qmp/qstring.h"
42 #include "qapi/clone-visitor.h"
43 
44 #include "block/qdict.h"
45 #include "block/nbd.h"
46 #include "block/block_int.h"
47 #include "block/coroutines.h"
48 
49 #include "qemu/yank.h"
50 
51 #define EN_OPTSTR ":exportname="
52 #define MAX_NBD_REQUESTS    16
53 
54 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
55 #define INDEX_TO_HANDLE(bs, index)  ((index)  ^ (uint64_t)(intptr_t)(bs))
56 
57 typedef struct {
58     Coroutine *coroutine;
59     uint64_t offset;        /* original offset of the request */
60     bool receiving;         /* waiting for connection_co? */
61 } NBDClientRequest;
62 
63 typedef enum NBDClientState {
64     NBD_CLIENT_CONNECTING_WAIT,
65     NBD_CLIENT_CONNECTING_NOWAIT,
66     NBD_CLIENT_CONNECTED,
67     NBD_CLIENT_QUIT
68 } NBDClientState;
69 
70 typedef struct BDRVNBDState {
71     QIOChannel *ioc; /* The current I/O channel */
72     NBDExportInfo info;
73 
74     CoMutex send_mutex;
75     CoQueue free_sema;
76     Coroutine *connection_co;
77     Coroutine *teardown_co;
78     QemuCoSleep reconnect_sleep;
79     bool drained;
80     bool wait_drained_end;
81     int in_flight;
82     NBDClientState state;
83     bool wait_in_flight;
84 
85     QEMUTimer *reconnect_delay_timer;
86 
87     NBDClientRequest requests[MAX_NBD_REQUESTS];
88     NBDReply reply;
89     BlockDriverState *bs;
90 
91     /* Connection parameters */
92     uint32_t reconnect_delay;
93     SocketAddress *saddr;
94     char *export, *tlscredsid;
95     QCryptoTLSCreds *tlscreds;
96     const char *hostname;
97     char *x_dirty_bitmap;
98     bool alloc_depth;
99 
100     NBDClientConnection *conn;
101 } BDRVNBDState;
102 
103 static void nbd_yank(void *opaque);
104 
105 static void nbd_clear_bdrvstate(BlockDriverState *bs)
106 {
107     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
108 
109     nbd_client_connection_release(s->conn);
110     s->conn = NULL;
111 
112     yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
113 
114     object_unref(OBJECT(s->tlscreds));
115     qapi_free_SocketAddress(s->saddr);
116     s->saddr = NULL;
117     g_free(s->export);
118     s->export = NULL;
119     g_free(s->tlscredsid);
120     s->tlscredsid = NULL;
121     g_free(s->x_dirty_bitmap);
122     s->x_dirty_bitmap = NULL;
123 }
124 
125 static bool nbd_client_connected(BDRVNBDState *s)
126 {
127     return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
128 }
129 
130 static void nbd_channel_error(BDRVNBDState *s, int ret)
131 {
132     if (ret == -EIO) {
133         if (nbd_client_connected(s)) {
134             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
135                                             NBD_CLIENT_CONNECTING_NOWAIT;
136         }
137     } else {
138         if (nbd_client_connected(s)) {
139             qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
140         }
141         s->state = NBD_CLIENT_QUIT;
142     }
143 }
144 
145 static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
146 {
147     int i;
148 
149     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
150         NBDClientRequest *req = &s->requests[i];
151 
152         if (req->coroutine && req->receiving) {
153             req->receiving = false;
154             aio_co_wake(req->coroutine);
155         }
156     }
157 }
158 
159 static void reconnect_delay_timer_del(BDRVNBDState *s)
160 {
161     if (s->reconnect_delay_timer) {
162         timer_free(s->reconnect_delay_timer);
163         s->reconnect_delay_timer = NULL;
164     }
165 }
166 
167 static void reconnect_delay_timer_cb(void *opaque)
168 {
169     BDRVNBDState *s = opaque;
170 
171     if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
172         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
173         while (qemu_co_enter_next(&s->free_sema, NULL)) {
174             /* Resume all queued requests */
175         }
176     }
177 
178     reconnect_delay_timer_del(s);
179 }
180 
181 static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
182 {
183     if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
184         return;
185     }
186 
187     assert(!s->reconnect_delay_timer);
188     s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
189                                              QEMU_CLOCK_REALTIME,
190                                              SCALE_NS,
191                                              reconnect_delay_timer_cb, s);
192     timer_mod(s->reconnect_delay_timer, expire_time_ns);
193 }
194 
195 static void nbd_client_detach_aio_context(BlockDriverState *bs)
196 {
197     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
198 
199     /* Timer is deleted in nbd_client_co_drain_begin() */
200     assert(!s->reconnect_delay_timer);
201     /*
202      * If reconnect is in progress we may have no ->ioc.  It will be
203      * re-instantiated in the proper aio context once the connection is
204      * reestablished.
205      */
206     if (s->ioc) {
207         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
208     }
209 }
210 
211 static void nbd_client_attach_aio_context_bh(void *opaque)
212 {
213     BlockDriverState *bs = opaque;
214     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
215 
216     if (s->connection_co) {
217         /*
218          * The node is still drained, so we know the coroutine has yielded in
219          * nbd_read_eof(), the only place where bs->in_flight can reach 0, or
220          * it is entered for the first time. Both places are safe for entering
221          * the coroutine.
222          */
223         qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
224     }
225     bdrv_dec_in_flight(bs);
226 }
227 
228 static void nbd_client_attach_aio_context(BlockDriverState *bs,
229                                           AioContext *new_context)
230 {
231     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
232 
233     /*
234      * s->connection_co is either yielded from nbd_receive_reply or from
235      * nbd_co_reconnect_loop()
236      */
237     if (nbd_client_connected(s)) {
238         qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
239     }
240 
241     bdrv_inc_in_flight(bs);
242 
243     /*
244      * Need to wait here for the BH to run because the BH must run while the
245      * node is still drained.
246      */
247     aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
248 }
249 
250 static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
251 {
252     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
253 
254     s->drained = true;
255     qemu_co_sleep_wake(&s->reconnect_sleep);
256 
257     nbd_co_establish_connection_cancel(s->conn);
258 
259     reconnect_delay_timer_del(s);
260 
261     if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
262         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
263         qemu_co_queue_restart_all(&s->free_sema);
264     }
265 }
266 
267 static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
268 {
269     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
270 
271     s->drained = false;
272     if (s->wait_drained_end) {
273         s->wait_drained_end = false;
274         aio_co_wake(s->connection_co);
275     }
276 }
277 
278 
279 static void nbd_teardown_connection(BlockDriverState *bs)
280 {
281     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
282 
283     if (s->ioc) {
284         /* finish any pending coroutines */
285         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
286     }
287 
288     s->state = NBD_CLIENT_QUIT;
289     if (s->connection_co) {
290         qemu_co_sleep_wake(&s->reconnect_sleep);
291         nbd_co_establish_connection_cancel(s->conn);
292     }
293     if (qemu_in_coroutine()) {
294         s->teardown_co = qemu_coroutine_self();
295         /* connection_co resumes us when it terminates */
296         qemu_coroutine_yield();
297         s->teardown_co = NULL;
298     } else {
299         BDRV_POLL_WHILE(bs, s->connection_co);
300     }
301     assert(!s->connection_co);
302 }
303 
304 static bool nbd_client_connecting(BDRVNBDState *s)
305 {
306     NBDClientState state = qatomic_load_acquire(&s->state);
307     return state == NBD_CLIENT_CONNECTING_WAIT ||
308         state == NBD_CLIENT_CONNECTING_NOWAIT;
309 }
310 
311 static bool nbd_client_connecting_wait(BDRVNBDState *s)
312 {
313     return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
314 }
315 
316 /*
317  * Update @bs with information learned during a completed negotiation process.
318  * Return failure if the server's advertised options are incompatible with the
319  * client's needs.
320  */
321 static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
322 {
323     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
324     int ret;
325 
326     if (s->x_dirty_bitmap) {
327         if (!s->info.base_allocation) {
328             error_setg(errp, "requested x-dirty-bitmap %s not found",
329                        s->x_dirty_bitmap);
330             return -EINVAL;
331         }
332         if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
333             s->alloc_depth = true;
334         }
335     }
336 
337     if (s->info.flags & NBD_FLAG_READ_ONLY) {
338         ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
339         if (ret < 0) {
340             return ret;
341         }
342     }
343 
344     if (s->info.flags & NBD_FLAG_SEND_FUA) {
345         bs->supported_write_flags = BDRV_REQ_FUA;
346         bs->supported_zero_flags |= BDRV_REQ_FUA;
347     }
348 
349     if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
350         bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
351         if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
352             bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
353         }
354     }
355 
356     trace_nbd_client_handshake_success(s->export);
357 
358     return 0;
359 }
360 
361 int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
362                                                 Error **errp)
363 {
364     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
365     int ret;
366 
367     assert(!s->ioc);
368 
369     s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
370     if (!s->ioc) {
371         return -ECONNREFUSED;
372     }
373 
374     yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
375                            bs);
376 
377     ret = nbd_handle_updated_info(s->bs, NULL);
378     if (ret < 0) {
379         /*
380          * We have connected, but must fail for other reasons.
381          * Send NBD_CMD_DISC as a courtesy to the server.
382          */
383         NBDRequest request = { .type = NBD_CMD_DISC };
384 
385         nbd_send_request(s->ioc, &request);
386 
387         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
388                                  nbd_yank, bs);
389         object_unref(OBJECT(s->ioc));
390         s->ioc = NULL;
391 
392         return ret;
393     }
394 
395     qio_channel_set_blocking(s->ioc, false, NULL);
396     qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
397 
398     /* successfully connected */
399     s->state = NBD_CLIENT_CONNECTED;
400     qemu_co_queue_restart_all(&s->free_sema);
401 
402     return 0;
403 }
404 
405 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
406 {
407     if (!nbd_client_connecting(s)) {
408         return;
409     }
410 
411     /* Wait for completion of all in-flight requests */
412 
413     qemu_co_mutex_lock(&s->send_mutex);
414 
415     while (s->in_flight > 0) {
416         qemu_co_mutex_unlock(&s->send_mutex);
417         nbd_recv_coroutines_wake_all(s);
418         s->wait_in_flight = true;
419         qemu_coroutine_yield();
420         s->wait_in_flight = false;
421         qemu_co_mutex_lock(&s->send_mutex);
422     }
423 
424     qemu_co_mutex_unlock(&s->send_mutex);
425 
426     if (!nbd_client_connecting(s)) {
427         return;
428     }
429 
430     /*
431      * Now we are sure that nobody is accessing the channel, and no one will
432      * try until we set the state to CONNECTED.
433      */
434 
435     /* Finalize previous connection if any */
436     if (s->ioc) {
437         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
438         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
439                                  nbd_yank, s->bs);
440         object_unref(OBJECT(s->ioc));
441         s->ioc = NULL;
442     }
443 
444     nbd_co_do_establish_connection(s->bs, NULL);
445 }
446 
447 static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
448 {
449     uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
450     uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
451 
452     if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
453         reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
454                                    s->reconnect_delay * NANOSECONDS_PER_SECOND);
455     }
456 
457     nbd_reconnect_attempt(s);
458 
459     while (nbd_client_connecting(s)) {
460         if (s->drained) {
461             bdrv_dec_in_flight(s->bs);
462             s->wait_drained_end = true;
463             while (s->drained) {
464                 /*
465                  * We may be entered once from nbd_client_attach_aio_context_bh
466                  * and then from nbd_client_co_drain_end. So here is a loop.
467                  */
468                 qemu_coroutine_yield();
469             }
470             bdrv_inc_in_flight(s->bs);
471         } else {
472             qemu_co_sleep_ns_wakeable(&s->reconnect_sleep,
473                                       QEMU_CLOCK_REALTIME, timeout);
474             if (s->drained) {
475                 continue;
476             }
477             if (timeout < max_timeout) {
478                 timeout *= 2;
479             }
480         }
481 
482         nbd_reconnect_attempt(s);
483     }
484 
485     reconnect_delay_timer_del(s);
486 }
487 
488 static coroutine_fn void nbd_connection_entry(void *opaque)
489 {
490     BDRVNBDState *s = opaque;
491     uint64_t i;
492     int ret = 0;
493     Error *local_err = NULL;
494 
495     while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
496         /*
497          * The NBD client can only really be considered idle when it has
498          * yielded from qio_channel_readv_all_eof(), waiting for data. This is
499          * the point where the additional scheduled coroutine entry happens
500          * after nbd_client_attach_aio_context().
501          *
502          * Therefore we keep an additional in_flight reference all the time and
503          * only drop it temporarily here.
504          */
505 
506         if (nbd_client_connecting(s)) {
507             nbd_co_reconnect_loop(s);
508         }
509 
510         if (!nbd_client_connected(s)) {
511             continue;
512         }
513 
514         assert(s->reply.handle == 0);
515         ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
516 
517         if (local_err) {
518             trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
519             error_free(local_err);
520             local_err = NULL;
521         }
522         if (ret <= 0) {
523             nbd_channel_error(s, ret ? ret : -EIO);
524             continue;
525         }
526 
527         /*
528          * There's no need for a mutex on the receive side, because the
529          * handler acts as a synchronization point and ensures that only
530          * one coroutine is called until the reply finishes.
531          */
532         i = HANDLE_TO_INDEX(s, s->reply.handle);
533         if (i >= MAX_NBD_REQUESTS ||
534             !s->requests[i].coroutine ||
535             !s->requests[i].receiving ||
536             (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
537         {
538             nbd_channel_error(s, -EINVAL);
539             continue;
540         }
541 
542         /*
543          * We're woken up again by the request itself.  Note that there
544          * is no race between yielding and reentering connection_co.  This
545          * is because:
546          *
547          * - if the request runs on the same AioContext, it is only
548          *   entered after we yield
549          *
550          * - if the request runs on a different AioContext, reentering
551          *   connection_co happens through a bottom half, which can only
552          *   run after we yield.
553          */
554         s->requests[i].receiving = false;
555         aio_co_wake(s->requests[i].coroutine);
556         qemu_coroutine_yield();
557     }
558 
559     qemu_co_queue_restart_all(&s->free_sema);
560     nbd_recv_coroutines_wake_all(s);
561     bdrv_dec_in_flight(s->bs);
562 
563     s->connection_co = NULL;
564     if (s->ioc) {
565         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
566         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
567                                  nbd_yank, s->bs);
568         object_unref(OBJECT(s->ioc));
569         s->ioc = NULL;
570     }
571 
572     if (s->teardown_co) {
573         aio_co_wake(s->teardown_co);
574     }
575     aio_wait_kick();
576 }
577 
578 static int nbd_co_send_request(BlockDriverState *bs,
579                                NBDRequest *request,
580                                QEMUIOVector *qiov)
581 {
582     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
583     int rc, i = -1;
584 
585     qemu_co_mutex_lock(&s->send_mutex);
586     while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
587         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
588     }
589 
590     if (!nbd_client_connected(s)) {
591         rc = -EIO;
592         goto err;
593     }
594 
595     s->in_flight++;
596 
597     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
598         if (s->requests[i].coroutine == NULL) {
599             break;
600         }
601     }
602 
603     g_assert(qemu_in_coroutine());
604     assert(i < MAX_NBD_REQUESTS);
605 
606     s->requests[i].coroutine = qemu_coroutine_self();
607     s->requests[i].offset = request->from;
608     s->requests[i].receiving = false;
609 
610     request->handle = INDEX_TO_HANDLE(s, i);
611 
612     assert(s->ioc);
613 
614     if (qiov) {
615         qio_channel_set_cork(s->ioc, true);
616         rc = nbd_send_request(s->ioc, request);
617         if (nbd_client_connected(s) && rc >= 0) {
618             if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
619                                        NULL) < 0) {
620                 rc = -EIO;
621             }
622         } else if (rc >= 0) {
623             rc = -EIO;
624         }
625         qio_channel_set_cork(s->ioc, false);
626     } else {
627         rc = nbd_send_request(s->ioc, request);
628     }
629 
630 err:
631     if (rc < 0) {
632         nbd_channel_error(s, rc);
633         if (i != -1) {
634             s->requests[i].coroutine = NULL;
635             s->in_flight--;
636         }
637         if (s->in_flight == 0 && s->wait_in_flight) {
638             aio_co_wake(s->connection_co);
639         } else {
640             qemu_co_queue_next(&s->free_sema);
641         }
642     }
643     qemu_co_mutex_unlock(&s->send_mutex);
644     return rc;
645 }
646 
647 static inline uint16_t payload_advance16(uint8_t **payload)
648 {
649     *payload += 2;
650     return lduw_be_p(*payload - 2);
651 }
652 
653 static inline uint32_t payload_advance32(uint8_t **payload)
654 {
655     *payload += 4;
656     return ldl_be_p(*payload - 4);
657 }
658 
659 static inline uint64_t payload_advance64(uint8_t **payload)
660 {
661     *payload += 8;
662     return ldq_be_p(*payload - 8);
663 }
664 
665 static int nbd_parse_offset_hole_payload(BDRVNBDState *s,
666                                          NBDStructuredReplyChunk *chunk,
667                                          uint8_t *payload, uint64_t orig_offset,
668                                          QEMUIOVector *qiov, Error **errp)
669 {
670     uint64_t offset;
671     uint32_t hole_size;
672 
673     if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
674         error_setg(errp, "Protocol error: invalid payload for "
675                          "NBD_REPLY_TYPE_OFFSET_HOLE");
676         return -EINVAL;
677     }
678 
679     offset = payload_advance64(&payload);
680     hole_size = payload_advance32(&payload);
681 
682     if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
683         offset > orig_offset + qiov->size - hole_size) {
684         error_setg(errp, "Protocol error: server sent chunk exceeding requested"
685                          " region");
686         return -EINVAL;
687     }
688     if (s->info.min_block &&
689         !QEMU_IS_ALIGNED(hole_size, s->info.min_block)) {
690         trace_nbd_structured_read_compliance("hole");
691     }
692 
693     qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
694 
695     return 0;
696 }
697 
698 /*
699  * nbd_parse_blockstatus_payload
700  * Based on our request, we expect only one extent in reply, for the
701  * base:allocation context.
702  */
703 static int nbd_parse_blockstatus_payload(BDRVNBDState *s,
704                                          NBDStructuredReplyChunk *chunk,
705                                          uint8_t *payload, uint64_t orig_length,
706                                          NBDExtent *extent, Error **errp)
707 {
708     uint32_t context_id;
709 
710     /* The server succeeded, so it must have sent [at least] one extent */
711     if (chunk->length < sizeof(context_id) + sizeof(*extent)) {
712         error_setg(errp, "Protocol error: invalid payload for "
713                          "NBD_REPLY_TYPE_BLOCK_STATUS");
714         return -EINVAL;
715     }
716 
717     context_id = payload_advance32(&payload);
718     if (s->info.context_id != context_id) {
719         error_setg(errp, "Protocol error: unexpected context id %d for "
720                          "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
721                          "id is %d", context_id,
722                          s->info.context_id);
723         return -EINVAL;
724     }
725 
726     extent->length = payload_advance32(&payload);
727     extent->flags = payload_advance32(&payload);
728 
729     if (extent->length == 0) {
730         error_setg(errp, "Protocol error: server sent status chunk with "
731                    "zero length");
732         return -EINVAL;
733     }
734 
735     /*
736      * A server sending unaligned block status is in violation of the
737      * protocol, but as qemu-nbd 3.1 is such a server (at least for
738      * POSIX files that are not a multiple of 512 bytes, since qemu
739      * rounds files up to 512-byte multiples but lseek(SEEK_HOLE)
740      * still sees an implicit hole beyond the real EOF), it's nicer to
741      * work around the misbehaving server. If the request included
742      * more than the final unaligned block, truncate it back to an
743      * aligned result; if the request was only the final block, round
744      * up to the full block and change the status to fully-allocated
745      * (always a safe status, even if it loses information).
746      */
747     if (s->info.min_block && !QEMU_IS_ALIGNED(extent->length,
748                                                    s->info.min_block)) {
749         trace_nbd_parse_blockstatus_compliance("extent length is unaligned");
750         if (extent->length > s->info.min_block) {
751             extent->length = QEMU_ALIGN_DOWN(extent->length,
752                                              s->info.min_block);
753         } else {
754             extent->length = s->info.min_block;
755             extent->flags = 0;
756         }
757     }
758 
759     /*
760      * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
761      * sent us any more than one extent, nor should it have included
762      * status beyond our request in that extent. However, it's easy
763      * enough to ignore the server's noncompliance without killing the
764      * connection; just ignore trailing extents, and clamp things to
765      * the length of our request.
766      */
767     if (chunk->length > sizeof(context_id) + sizeof(*extent)) {
768         trace_nbd_parse_blockstatus_compliance("more than one extent");
769     }
770     if (extent->length > orig_length) {
771         extent->length = orig_length;
772         trace_nbd_parse_blockstatus_compliance("extent length too large");
773     }
774 
775     /*
776      * HACK: if we are using x-dirty-bitmaps to access
777      * qemu:allocation-depth, treat all depths > 2 the same as 2,
778      * since nbd_client_co_block_status is only expecting the low two
779      * bits to be set.
780      */
781     if (s->alloc_depth && extent->flags > 2) {
782         extent->flags = 2;
783     }
784 
785     return 0;
786 }
787 
788 /*
789  * nbd_parse_error_payload
790  * on success @errp contains message describing nbd error reply
791  */
792 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
793                                    uint8_t *payload, int *request_ret,
794                                    Error **errp)
795 {
796     uint32_t error;
797     uint16_t message_size;
798 
799     assert(chunk->type & (1 << 15));
800 
801     if (chunk->length < sizeof(error) + sizeof(message_size)) {
802         error_setg(errp,
803                    "Protocol error: invalid payload for structured error");
804         return -EINVAL;
805     }
806 
807     error = nbd_errno_to_system_errno(payload_advance32(&payload));
808     if (error == 0) {
809         error_setg(errp, "Protocol error: server sent structured error chunk "
810                          "with error = 0");
811         return -EINVAL;
812     }
813 
814     *request_ret = -error;
815     message_size = payload_advance16(&payload);
816 
817     if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
818         error_setg(errp, "Protocol error: server sent structured error chunk "
819                          "with incorrect message size");
820         return -EINVAL;
821     }
822 
823     /* TODO: Add a trace point to mention the server complaint */
824 
825     /* TODO handle ERROR_OFFSET */
826 
827     return 0;
828 }
829 
830 static int nbd_co_receive_offset_data_payload(BDRVNBDState *s,
831                                               uint64_t orig_offset,
832                                               QEMUIOVector *qiov, Error **errp)
833 {
834     QEMUIOVector sub_qiov;
835     uint64_t offset;
836     size_t data_size;
837     int ret;
838     NBDStructuredReplyChunk *chunk = &s->reply.structured;
839 
840     assert(nbd_reply_is_structured(&s->reply));
841 
842     /* The NBD spec requires at least one byte of payload */
843     if (chunk->length <= sizeof(offset)) {
844         error_setg(errp, "Protocol error: invalid payload for "
845                          "NBD_REPLY_TYPE_OFFSET_DATA");
846         return -EINVAL;
847     }
848 
849     if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
850         return -EIO;
851     }
852 
853     data_size = chunk->length - sizeof(offset);
854     assert(data_size);
855     if (offset < orig_offset || data_size > qiov->size ||
856         offset > orig_offset + qiov->size - data_size) {
857         error_setg(errp, "Protocol error: server sent chunk exceeding requested"
858                          " region");
859         return -EINVAL;
860     }
861     if (s->info.min_block && !QEMU_IS_ALIGNED(data_size, s->info.min_block)) {
862         trace_nbd_structured_read_compliance("data");
863     }
864 
865     qemu_iovec_init(&sub_qiov, qiov->niov);
866     qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
867     ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
868     qemu_iovec_destroy(&sub_qiov);
869 
870     return ret < 0 ? -EIO : 0;
871 }
872 
873 #define NBD_MAX_MALLOC_PAYLOAD 1000
874 static coroutine_fn int nbd_co_receive_structured_payload(
875         BDRVNBDState *s, void **payload, Error **errp)
876 {
877     int ret;
878     uint32_t len;
879 
880     assert(nbd_reply_is_structured(&s->reply));
881 
882     len = s->reply.structured.length;
883 
884     if (len == 0) {
885         return 0;
886     }
887 
888     if (payload == NULL) {
889         error_setg(errp, "Unexpected structured payload");
890         return -EINVAL;
891     }
892 
893     if (len > NBD_MAX_MALLOC_PAYLOAD) {
894         error_setg(errp, "Payload too large");
895         return -EINVAL;
896     }
897 
898     *payload = g_new(char, len);
899     ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
900     if (ret < 0) {
901         g_free(*payload);
902         *payload = NULL;
903         return ret;
904     }
905 
906     return 0;
907 }
908 
909 /*
910  * nbd_co_do_receive_one_chunk
911  * for simple reply:
912  *   set request_ret to received reply error
913  *   if qiov is not NULL: read payload to @qiov
914  * for structured reply chunk:
915  *   if error chunk: read payload, set @request_ret, do not set @payload
916  *   else if offset_data chunk: read payload data to @qiov, do not set @payload
917  *   else: read payload to @payload
918  *
919  * If function fails, @errp contains corresponding error message, and the
920  * connection with the server is suspect.  If it returns 0, then the
921  * transaction succeeded (although @request_ret may be a negative errno
922  * corresponding to the server's error reply), and errp is unchanged.
923  */
924 static coroutine_fn int nbd_co_do_receive_one_chunk(
925         BDRVNBDState *s, uint64_t handle, bool only_structured,
926         int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
927 {
928     int ret;
929     int i = HANDLE_TO_INDEX(s, handle);
930     void *local_payload = NULL;
931     NBDStructuredReplyChunk *chunk;
932 
933     if (payload) {
934         *payload = NULL;
935     }
936     *request_ret = 0;
937 
938     /* Wait until we're woken up by nbd_connection_entry.  */
939     s->requests[i].receiving = true;
940     qemu_coroutine_yield();
941     assert(!s->requests[i].receiving);
942     if (!nbd_client_connected(s)) {
943         error_setg(errp, "Connection closed");
944         return -EIO;
945     }
946     assert(s->ioc);
947 
948     assert(s->reply.handle == handle);
949 
950     if (nbd_reply_is_simple(&s->reply)) {
951         if (only_structured) {
952             error_setg(errp, "Protocol error: simple reply when structured "
953                              "reply chunk was expected");
954             return -EINVAL;
955         }
956 
957         *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
958         if (*request_ret < 0 || !qiov) {
959             return 0;
960         }
961 
962         return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
963                                      errp) < 0 ? -EIO : 0;
964     }
965 
966     /* handle structured reply chunk */
967     assert(s->info.structured_reply);
968     chunk = &s->reply.structured;
969 
970     if (chunk->type == NBD_REPLY_TYPE_NONE) {
971         if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
972             error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
973                        " NBD_REPLY_FLAG_DONE flag set");
974             return -EINVAL;
975         }
976         if (chunk->length) {
977             error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
978                        " nonzero length");
979             return -EINVAL;
980         }
981         return 0;
982     }
983 
984     if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
985         if (!qiov) {
986             error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
987             return -EINVAL;
988         }
989 
990         return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
991                                                   qiov, errp);
992     }
993 
994     if (nbd_reply_type_is_error(chunk->type)) {
995         payload = &local_payload;
996     }
997 
998     ret = nbd_co_receive_structured_payload(s, payload, errp);
999     if (ret < 0) {
1000         return ret;
1001     }
1002 
1003     if (nbd_reply_type_is_error(chunk->type)) {
1004         ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
1005         g_free(local_payload);
1006         return ret;
1007     }
1008 
1009     return 0;
1010 }
1011 
1012 /*
1013  * nbd_co_receive_one_chunk
1014  * Read reply, wake up connection_co and set s->quit if needed.
1015  * Return value is a fatal error code or normal nbd reply error code
1016  */
1017 static coroutine_fn int nbd_co_receive_one_chunk(
1018         BDRVNBDState *s, uint64_t handle, bool only_structured,
1019         int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
1020         Error **errp)
1021 {
1022     int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
1023                                           request_ret, qiov, payload, errp);
1024 
1025     if (ret < 0) {
1026         memset(reply, 0, sizeof(*reply));
1027         nbd_channel_error(s, ret);
1028     } else {
1029         /* For assert at loop start in nbd_connection_entry */
1030         *reply = s->reply;
1031     }
1032     s->reply.handle = 0;
1033 
1034     if (s->connection_co && !s->wait_in_flight) {
1035         /*
1036          * We must check s->wait_in_flight, because we may entered by
1037          * nbd_recv_coroutines_wake_all(), in this case we should not
1038          * wake connection_co here, it will woken by last request.
1039          */
1040         aio_co_wake(s->connection_co);
1041     }
1042 
1043     return ret;
1044 }
1045 
1046 typedef struct NBDReplyChunkIter {
1047     int ret;
1048     int request_ret;
1049     Error *err;
1050     bool done, only_structured;
1051 } NBDReplyChunkIter;
1052 
1053 static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
1054                                    int ret, Error **local_err)
1055 {
1056     assert(local_err && *local_err);
1057     assert(ret < 0);
1058 
1059     if (!iter->ret) {
1060         iter->ret = ret;
1061         error_propagate(&iter->err, *local_err);
1062     } else {
1063         error_free(*local_err);
1064     }
1065 
1066     *local_err = NULL;
1067 }
1068 
1069 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
1070 {
1071     assert(ret < 0);
1072 
1073     if (!iter->request_ret) {
1074         iter->request_ret = ret;
1075     }
1076 }
1077 
1078 /*
1079  * NBD_FOREACH_REPLY_CHUNK
1080  * The pointer stored in @payload requires g_free() to free it.
1081  */
1082 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
1083                                 qiov, reply, payload) \
1084     for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
1085          nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
1086 
1087 /*
1088  * nbd_reply_chunk_iter_receive
1089  * The pointer stored in @payload requires g_free() to free it.
1090  */
1091 static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
1092                                          NBDReplyChunkIter *iter,
1093                                          uint64_t handle,
1094                                          QEMUIOVector *qiov, NBDReply *reply,
1095                                          void **payload)
1096 {
1097     int ret, request_ret;
1098     NBDReply local_reply;
1099     NBDStructuredReplyChunk *chunk;
1100     Error *local_err = NULL;
1101     if (!nbd_client_connected(s)) {
1102         error_setg(&local_err, "Connection closed");
1103         nbd_iter_channel_error(iter, -EIO, &local_err);
1104         goto break_loop;
1105     }
1106 
1107     if (iter->done) {
1108         /* Previous iteration was last. */
1109         goto break_loop;
1110     }
1111 
1112     if (reply == NULL) {
1113         reply = &local_reply;
1114     }
1115 
1116     ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
1117                                    &request_ret, qiov, reply, payload,
1118                                    &local_err);
1119     if (ret < 0) {
1120         nbd_iter_channel_error(iter, ret, &local_err);
1121     } else if (request_ret < 0) {
1122         nbd_iter_request_error(iter, request_ret);
1123     }
1124 
1125     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
1126     if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
1127         goto break_loop;
1128     }
1129 
1130     chunk = &reply->structured;
1131     iter->only_structured = true;
1132 
1133     if (chunk->type == NBD_REPLY_TYPE_NONE) {
1134         /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
1135         assert(chunk->flags & NBD_REPLY_FLAG_DONE);
1136         goto break_loop;
1137     }
1138 
1139     if (chunk->flags & NBD_REPLY_FLAG_DONE) {
1140         /* This iteration is last. */
1141         iter->done = true;
1142     }
1143 
1144     /* Execute the loop body */
1145     return true;
1146 
1147 break_loop:
1148     s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
1149 
1150     qemu_co_mutex_lock(&s->send_mutex);
1151     s->in_flight--;
1152     if (s->in_flight == 0 && s->wait_in_flight) {
1153         aio_co_wake(s->connection_co);
1154     } else {
1155         qemu_co_queue_next(&s->free_sema);
1156     }
1157     qemu_co_mutex_unlock(&s->send_mutex);
1158 
1159     return false;
1160 }
1161 
1162 static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
1163                                       int *request_ret, Error **errp)
1164 {
1165     NBDReplyChunkIter iter;
1166 
1167     NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
1168         /* nbd_reply_chunk_iter_receive does all the work */
1169     }
1170 
1171     error_propagate(errp, iter.err);
1172     *request_ret = iter.request_ret;
1173     return iter.ret;
1174 }
1175 
1176 static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
1177                                         uint64_t offset, QEMUIOVector *qiov,
1178                                         int *request_ret, Error **errp)
1179 {
1180     NBDReplyChunkIter iter;
1181     NBDReply reply;
1182     void *payload = NULL;
1183     Error *local_err = NULL;
1184 
1185     NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
1186                             qiov, &reply, &payload)
1187     {
1188         int ret;
1189         NBDStructuredReplyChunk *chunk = &reply.structured;
1190 
1191         assert(nbd_reply_is_structured(&reply));
1192 
1193         switch (chunk->type) {
1194         case NBD_REPLY_TYPE_OFFSET_DATA:
1195             /*
1196              * special cased in nbd_co_receive_one_chunk, data is already
1197              * in qiov
1198              */
1199             break;
1200         case NBD_REPLY_TYPE_OFFSET_HOLE:
1201             ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload,
1202                                                 offset, qiov, &local_err);
1203             if (ret < 0) {
1204                 nbd_channel_error(s, ret);
1205                 nbd_iter_channel_error(&iter, ret, &local_err);
1206             }
1207             break;
1208         default:
1209             if (!nbd_reply_type_is_error(chunk->type)) {
1210                 /* not allowed reply type */
1211                 nbd_channel_error(s, -EINVAL);
1212                 error_setg(&local_err,
1213                            "Unexpected reply type: %d (%s) for CMD_READ",
1214                            chunk->type, nbd_reply_type_lookup(chunk->type));
1215                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1216             }
1217         }
1218 
1219         g_free(payload);
1220         payload = NULL;
1221     }
1222 
1223     error_propagate(errp, iter.err);
1224     *request_ret = iter.request_ret;
1225     return iter.ret;
1226 }
1227 
1228 static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
1229                                             uint64_t handle, uint64_t length,
1230                                             NBDExtent *extent,
1231                                             int *request_ret, Error **errp)
1232 {
1233     NBDReplyChunkIter iter;
1234     NBDReply reply;
1235     void *payload = NULL;
1236     Error *local_err = NULL;
1237     bool received = false;
1238 
1239     assert(!extent->length);
1240     NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, &reply, &payload) {
1241         int ret;
1242         NBDStructuredReplyChunk *chunk = &reply.structured;
1243 
1244         assert(nbd_reply_is_structured(&reply));
1245 
1246         switch (chunk->type) {
1247         case NBD_REPLY_TYPE_BLOCK_STATUS:
1248             if (received) {
1249                 nbd_channel_error(s, -EINVAL);
1250                 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
1251                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1252             }
1253             received = true;
1254 
1255             ret = nbd_parse_blockstatus_payload(s, &reply.structured,
1256                                                 payload, length, extent,
1257                                                 &local_err);
1258             if (ret < 0) {
1259                 nbd_channel_error(s, ret);
1260                 nbd_iter_channel_error(&iter, ret, &local_err);
1261             }
1262             break;
1263         default:
1264             if (!nbd_reply_type_is_error(chunk->type)) {
1265                 nbd_channel_error(s, -EINVAL);
1266                 error_setg(&local_err,
1267                            "Unexpected reply type: %d (%s) "
1268                            "for CMD_BLOCK_STATUS",
1269                            chunk->type, nbd_reply_type_lookup(chunk->type));
1270                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1271             }
1272         }
1273 
1274         g_free(payload);
1275         payload = NULL;
1276     }
1277 
1278     if (!extent->length && !iter.request_ret) {
1279         error_setg(&local_err, "Server did not reply with any status extents");
1280         nbd_iter_channel_error(&iter, -EIO, &local_err);
1281     }
1282 
1283     error_propagate(errp, iter.err);
1284     *request_ret = iter.request_ret;
1285     return iter.ret;
1286 }
1287 
1288 static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
1289                           QEMUIOVector *write_qiov)
1290 {
1291     int ret, request_ret;
1292     Error *local_err = NULL;
1293     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1294 
1295     assert(request->type != NBD_CMD_READ);
1296     if (write_qiov) {
1297         assert(request->type == NBD_CMD_WRITE);
1298         assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
1299     } else {
1300         assert(request->type != NBD_CMD_WRITE);
1301     }
1302 
1303     do {
1304         ret = nbd_co_send_request(bs, request, write_qiov);
1305         if (ret < 0) {
1306             continue;
1307         }
1308 
1309         ret = nbd_co_receive_return_code(s, request->handle,
1310                                          &request_ret, &local_err);
1311         if (local_err) {
1312             trace_nbd_co_request_fail(request->from, request->len,
1313                                       request->handle, request->flags,
1314                                       request->type,
1315                                       nbd_cmd_lookup(request->type),
1316                                       ret, error_get_pretty(local_err));
1317             error_free(local_err);
1318             local_err = NULL;
1319         }
1320     } while (ret < 0 && nbd_client_connecting_wait(s));
1321 
1322     return ret ? ret : request_ret;
1323 }
1324 
1325 static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
1326                                 uint64_t bytes, QEMUIOVector *qiov, int flags)
1327 {
1328     int ret, request_ret;
1329     Error *local_err = NULL;
1330     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1331     NBDRequest request = {
1332         .type = NBD_CMD_READ,
1333         .from = offset,
1334         .len = bytes,
1335     };
1336 
1337     assert(bytes <= NBD_MAX_BUFFER_SIZE);
1338     assert(!flags);
1339 
1340     if (!bytes) {
1341         return 0;
1342     }
1343     /*
1344      * Work around the fact that the block layer doesn't do
1345      * byte-accurate sizing yet - if the read exceeds the server's
1346      * advertised size because the block layer rounded size up, then
1347      * truncate the request to the server and tail-pad with zero.
1348      */
1349     if (offset >= s->info.size) {
1350         assert(bytes < BDRV_SECTOR_SIZE);
1351         qemu_iovec_memset(qiov, 0, 0, bytes);
1352         return 0;
1353     }
1354     if (offset + bytes > s->info.size) {
1355         uint64_t slop = offset + bytes - s->info.size;
1356 
1357         assert(slop < BDRV_SECTOR_SIZE);
1358         qemu_iovec_memset(qiov, bytes - slop, 0, slop);
1359         request.len -= slop;
1360     }
1361 
1362     do {
1363         ret = nbd_co_send_request(bs, &request, NULL);
1364         if (ret < 0) {
1365             continue;
1366         }
1367 
1368         ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
1369                                            &request_ret, &local_err);
1370         if (local_err) {
1371             trace_nbd_co_request_fail(request.from, request.len, request.handle,
1372                                       request.flags, request.type,
1373                                       nbd_cmd_lookup(request.type),
1374                                       ret, error_get_pretty(local_err));
1375             error_free(local_err);
1376             local_err = NULL;
1377         }
1378     } while (ret < 0 && nbd_client_connecting_wait(s));
1379 
1380     return ret ? ret : request_ret;
1381 }
1382 
1383 static int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
1384                                  uint64_t bytes, QEMUIOVector *qiov, int flags)
1385 {
1386     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1387     NBDRequest request = {
1388         .type = NBD_CMD_WRITE,
1389         .from = offset,
1390         .len = bytes,
1391     };
1392 
1393     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1394     if (flags & BDRV_REQ_FUA) {
1395         assert(s->info.flags & NBD_FLAG_SEND_FUA);
1396         request.flags |= NBD_CMD_FLAG_FUA;
1397     }
1398 
1399     assert(bytes <= NBD_MAX_BUFFER_SIZE);
1400 
1401     if (!bytes) {
1402         return 0;
1403     }
1404     return nbd_co_request(bs, &request, qiov);
1405 }
1406 
1407 static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1408                                        int bytes, BdrvRequestFlags flags)
1409 {
1410     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1411     NBDRequest request = {
1412         .type = NBD_CMD_WRITE_ZEROES,
1413         .from = offset,
1414         .len = bytes,
1415     };
1416 
1417     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1418     if (!(s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
1419         return -ENOTSUP;
1420     }
1421 
1422     if (flags & BDRV_REQ_FUA) {
1423         assert(s->info.flags & NBD_FLAG_SEND_FUA);
1424         request.flags |= NBD_CMD_FLAG_FUA;
1425     }
1426     if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1427         request.flags |= NBD_CMD_FLAG_NO_HOLE;
1428     }
1429     if (flags & BDRV_REQ_NO_FALLBACK) {
1430         assert(s->info.flags & NBD_FLAG_SEND_FAST_ZERO);
1431         request.flags |= NBD_CMD_FLAG_FAST_ZERO;
1432     }
1433 
1434     if (!bytes) {
1435         return 0;
1436     }
1437     return nbd_co_request(bs, &request, NULL);
1438 }
1439 
1440 static int nbd_client_co_flush(BlockDriverState *bs)
1441 {
1442     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1443     NBDRequest request = { .type = NBD_CMD_FLUSH };
1444 
1445     if (!(s->info.flags & NBD_FLAG_SEND_FLUSH)) {
1446         return 0;
1447     }
1448 
1449     request.from = 0;
1450     request.len = 0;
1451 
1452     return nbd_co_request(bs, &request, NULL);
1453 }
1454 
1455 static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
1456                                   int bytes)
1457 {
1458     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1459     NBDRequest request = {
1460         .type = NBD_CMD_TRIM,
1461         .from = offset,
1462         .len = bytes,
1463     };
1464 
1465     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1466     if (!(s->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
1467         return 0;
1468     }
1469 
1470     return nbd_co_request(bs, &request, NULL);
1471 }
1472 
1473 static int coroutine_fn nbd_client_co_block_status(
1474         BlockDriverState *bs, bool want_zero, int64_t offset, int64_t bytes,
1475         int64_t *pnum, int64_t *map, BlockDriverState **file)
1476 {
1477     int ret, request_ret;
1478     NBDExtent extent = { 0 };
1479     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1480     Error *local_err = NULL;
1481 
1482     NBDRequest request = {
1483         .type = NBD_CMD_BLOCK_STATUS,
1484         .from = offset,
1485         .len = MIN(QEMU_ALIGN_DOWN(INT_MAX, bs->bl.request_alignment),
1486                    MIN(bytes, s->info.size - offset)),
1487         .flags = NBD_CMD_FLAG_REQ_ONE,
1488     };
1489 
1490     if (!s->info.base_allocation) {
1491         *pnum = bytes;
1492         *map = offset;
1493         *file = bs;
1494         return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1495     }
1496 
1497     /*
1498      * Work around the fact that the block layer doesn't do
1499      * byte-accurate sizing yet - if the status request exceeds the
1500      * server's advertised size because the block layer rounded size
1501      * up, we truncated the request to the server (above), or are
1502      * called on just the hole.
1503      */
1504     if (offset >= s->info.size) {
1505         *pnum = bytes;
1506         assert(bytes < BDRV_SECTOR_SIZE);
1507         /* Intentionally don't report offset_valid for the hole */
1508         return BDRV_BLOCK_ZERO;
1509     }
1510 
1511     if (s->info.min_block) {
1512         assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
1513     }
1514     do {
1515         ret = nbd_co_send_request(bs, &request, NULL);
1516         if (ret < 0) {
1517             continue;
1518         }
1519 
1520         ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
1521                                                &extent, &request_ret,
1522                                                &local_err);
1523         if (local_err) {
1524             trace_nbd_co_request_fail(request.from, request.len, request.handle,
1525                                       request.flags, request.type,
1526                                       nbd_cmd_lookup(request.type),
1527                                       ret, error_get_pretty(local_err));
1528             error_free(local_err);
1529             local_err = NULL;
1530         }
1531     } while (ret < 0 && nbd_client_connecting_wait(s));
1532 
1533     if (ret < 0 || request_ret < 0) {
1534         return ret ? ret : request_ret;
1535     }
1536 
1537     assert(extent.length);
1538     *pnum = extent.length;
1539     *map = offset;
1540     *file = bs;
1541     return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
1542         (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0) |
1543         BDRV_BLOCK_OFFSET_VALID;
1544 }
1545 
1546 static int nbd_client_reopen_prepare(BDRVReopenState *state,
1547                                      BlockReopenQueue *queue, Error **errp)
1548 {
1549     BDRVNBDState *s = (BDRVNBDState *)state->bs->opaque;
1550 
1551     if ((state->flags & BDRV_O_RDWR) && (s->info.flags & NBD_FLAG_READ_ONLY)) {
1552         error_setg(errp, "Can't reopen read-only NBD mount as read/write");
1553         return -EACCES;
1554     }
1555     return 0;
1556 }
1557 
1558 static void nbd_yank(void *opaque)
1559 {
1560     BlockDriverState *bs = opaque;
1561     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1562 
1563     qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
1564     qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1565 }
1566 
1567 static void nbd_client_close(BlockDriverState *bs)
1568 {
1569     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1570     NBDRequest request = { .type = NBD_CMD_DISC };
1571 
1572     if (s->ioc) {
1573         nbd_send_request(s->ioc, &request);
1574     }
1575 
1576     nbd_teardown_connection(bs);
1577 }
1578 
1579 
1580 /*
1581  * Parse nbd_open options
1582  */
1583 
1584 static int nbd_parse_uri(const char *filename, QDict *options)
1585 {
1586     URI *uri;
1587     const char *p;
1588     QueryParams *qp = NULL;
1589     int ret = 0;
1590     bool is_unix;
1591 
1592     uri = uri_parse(filename);
1593     if (!uri) {
1594         return -EINVAL;
1595     }
1596 
1597     /* transport */
1598     if (!g_strcmp0(uri->scheme, "nbd")) {
1599         is_unix = false;
1600     } else if (!g_strcmp0(uri->scheme, "nbd+tcp")) {
1601         is_unix = false;
1602     } else if (!g_strcmp0(uri->scheme, "nbd+unix")) {
1603         is_unix = true;
1604     } else {
1605         ret = -EINVAL;
1606         goto out;
1607     }
1608 
1609     p = uri->path ? uri->path : "";
1610     if (p[0] == '/') {
1611         p++;
1612     }
1613     if (p[0]) {
1614         qdict_put_str(options, "export", p);
1615     }
1616 
1617     qp = query_params_parse(uri->query);
1618     if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
1619         ret = -EINVAL;
1620         goto out;
1621     }
1622 
1623     if (is_unix) {
1624         /* nbd+unix:///export?socket=path */
1625         if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
1626             ret = -EINVAL;
1627             goto out;
1628         }
1629         qdict_put_str(options, "server.type", "unix");
1630         qdict_put_str(options, "server.path", qp->p[0].value);
1631     } else {
1632         QString *host;
1633         char *port_str;
1634 
1635         /* nbd[+tcp]://host[:port]/export */
1636         if (!uri->server) {
1637             ret = -EINVAL;
1638             goto out;
1639         }
1640 
1641         /* strip braces from literal IPv6 address */
1642         if (uri->server[0] == '[') {
1643             host = qstring_from_substr(uri->server, 1,
1644                                        strlen(uri->server) - 1);
1645         } else {
1646             host = qstring_from_str(uri->server);
1647         }
1648 
1649         qdict_put_str(options, "server.type", "inet");
1650         qdict_put(options, "server.host", host);
1651 
1652         port_str = g_strdup_printf("%d", uri->port ?: NBD_DEFAULT_PORT);
1653         qdict_put_str(options, "server.port", port_str);
1654         g_free(port_str);
1655     }
1656 
1657 out:
1658     if (qp) {
1659         query_params_free(qp);
1660     }
1661     uri_free(uri);
1662     return ret;
1663 }
1664 
1665 static bool nbd_has_filename_options_conflict(QDict *options, Error **errp)
1666 {
1667     const QDictEntry *e;
1668 
1669     for (e = qdict_first(options); e; e = qdict_next(options, e)) {
1670         if (!strcmp(e->key, "host") ||
1671             !strcmp(e->key, "port") ||
1672             !strcmp(e->key, "path") ||
1673             !strcmp(e->key, "export") ||
1674             strstart(e->key, "server.", NULL))
1675         {
1676             error_setg(errp, "Option '%s' cannot be used with a file name",
1677                        e->key);
1678             return true;
1679         }
1680     }
1681 
1682     return false;
1683 }
1684 
1685 static void nbd_parse_filename(const char *filename, QDict *options,
1686                                Error **errp)
1687 {
1688     g_autofree char *file = NULL;
1689     char *export_name;
1690     const char *host_spec;
1691     const char *unixpath;
1692 
1693     if (nbd_has_filename_options_conflict(options, errp)) {
1694         return;
1695     }
1696 
1697     if (strstr(filename, "://")) {
1698         int ret = nbd_parse_uri(filename, options);
1699         if (ret < 0) {
1700             error_setg(errp, "No valid URL specified");
1701         }
1702         return;
1703     }
1704 
1705     file = g_strdup(filename);
1706 
1707     export_name = strstr(file, EN_OPTSTR);
1708     if (export_name) {
1709         if (export_name[strlen(EN_OPTSTR)] == 0) {
1710             return;
1711         }
1712         export_name[0] = 0; /* truncate 'file' */
1713         export_name += strlen(EN_OPTSTR);
1714 
1715         qdict_put_str(options, "export", export_name);
1716     }
1717 
1718     /* extract the host_spec - fail if it's not nbd:... */
1719     if (!strstart(file, "nbd:", &host_spec)) {
1720         error_setg(errp, "File name string for NBD must start with 'nbd:'");
1721         return;
1722     }
1723 
1724     if (!*host_spec) {
1725         return;
1726     }
1727 
1728     /* are we a UNIX or TCP socket? */
1729     if (strstart(host_spec, "unix:", &unixpath)) {
1730         qdict_put_str(options, "server.type", "unix");
1731         qdict_put_str(options, "server.path", unixpath);
1732     } else {
1733         InetSocketAddress *addr = g_new(InetSocketAddress, 1);
1734 
1735         if (inet_parse(addr, host_spec, errp)) {
1736             goto out_inet;
1737         }
1738 
1739         qdict_put_str(options, "server.type", "inet");
1740         qdict_put_str(options, "server.host", addr->host);
1741         qdict_put_str(options, "server.port", addr->port);
1742     out_inet:
1743         qapi_free_InetSocketAddress(addr);
1744     }
1745 }
1746 
1747 static bool nbd_process_legacy_socket_options(QDict *output_options,
1748                                               QemuOpts *legacy_opts,
1749                                               Error **errp)
1750 {
1751     const char *path = qemu_opt_get(legacy_opts, "path");
1752     const char *host = qemu_opt_get(legacy_opts, "host");
1753     const char *port = qemu_opt_get(legacy_opts, "port");
1754     const QDictEntry *e;
1755 
1756     if (!path && !host && !port) {
1757         return true;
1758     }
1759 
1760     for (e = qdict_first(output_options); e; e = qdict_next(output_options, e))
1761     {
1762         if (strstart(e->key, "server.", NULL)) {
1763             error_setg(errp, "Cannot use 'server' and path/host/port at the "
1764                        "same time");
1765             return false;
1766         }
1767     }
1768 
1769     if (path && host) {
1770         error_setg(errp, "path and host may not be used at the same time");
1771         return false;
1772     } else if (path) {
1773         if (port) {
1774             error_setg(errp, "port may not be used without host");
1775             return false;
1776         }
1777 
1778         qdict_put_str(output_options, "server.type", "unix");
1779         qdict_put_str(output_options, "server.path", path);
1780     } else if (host) {
1781         qdict_put_str(output_options, "server.type", "inet");
1782         qdict_put_str(output_options, "server.host", host);
1783         qdict_put_str(output_options, "server.port",
1784                       port ?: stringify(NBD_DEFAULT_PORT));
1785     }
1786 
1787     return true;
1788 }
1789 
1790 static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
1791                                  Error **errp)
1792 {
1793     SocketAddress *saddr = NULL;
1794     QDict *addr = NULL;
1795     Visitor *iv = NULL;
1796 
1797     qdict_extract_subqdict(options, &addr, "server.");
1798     if (!qdict_size(addr)) {
1799         error_setg(errp, "NBD server address missing");
1800         goto done;
1801     }
1802 
1803     iv = qobject_input_visitor_new_flat_confused(addr, errp);
1804     if (!iv) {
1805         goto done;
1806     }
1807 
1808     if (!visit_type_SocketAddress(iv, NULL, &saddr, errp)) {
1809         goto done;
1810     }
1811 
1812     if (socket_address_parse_named_fd(saddr, errp) < 0) {
1813         qapi_free_SocketAddress(saddr);
1814         saddr = NULL;
1815         goto done;
1816     }
1817 
1818 done:
1819     qobject_unref(addr);
1820     visit_free(iv);
1821     return saddr;
1822 }
1823 
1824 static QCryptoTLSCreds *nbd_get_tls_creds(const char *id, Error **errp)
1825 {
1826     Object *obj;
1827     QCryptoTLSCreds *creds;
1828 
1829     obj = object_resolve_path_component(
1830         object_get_objects_root(), id);
1831     if (!obj) {
1832         error_setg(errp, "No TLS credentials with id '%s'",
1833                    id);
1834         return NULL;
1835     }
1836     creds = (QCryptoTLSCreds *)
1837         object_dynamic_cast(obj, TYPE_QCRYPTO_TLS_CREDS);
1838     if (!creds) {
1839         error_setg(errp, "Object with id '%s' is not TLS credentials",
1840                    id);
1841         return NULL;
1842     }
1843 
1844     if (!qcrypto_tls_creds_check_endpoint(creds,
1845                                           QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT,
1846                                           errp)) {
1847         return NULL;
1848     }
1849     object_ref(obj);
1850     return creds;
1851 }
1852 
1853 
1854 static QemuOptsList nbd_runtime_opts = {
1855     .name = "nbd",
1856     .head = QTAILQ_HEAD_INITIALIZER(nbd_runtime_opts.head),
1857     .desc = {
1858         {
1859             .name = "host",
1860             .type = QEMU_OPT_STRING,
1861             .help = "TCP host to connect to",
1862         },
1863         {
1864             .name = "port",
1865             .type = QEMU_OPT_STRING,
1866             .help = "TCP port to connect to",
1867         },
1868         {
1869             .name = "path",
1870             .type = QEMU_OPT_STRING,
1871             .help = "Unix socket path to connect to",
1872         },
1873         {
1874             .name = "export",
1875             .type = QEMU_OPT_STRING,
1876             .help = "Name of the NBD export to open",
1877         },
1878         {
1879             .name = "tls-creds",
1880             .type = QEMU_OPT_STRING,
1881             .help = "ID of the TLS credentials to use",
1882         },
1883         {
1884             .name = "x-dirty-bitmap",
1885             .type = QEMU_OPT_STRING,
1886             .help = "experimental: expose named dirty bitmap in place of "
1887                     "block status",
1888         },
1889         {
1890             .name = "reconnect-delay",
1891             .type = QEMU_OPT_NUMBER,
1892             .help = "On an unexpected disconnect, the nbd client tries to "
1893                     "connect again until succeeding or encountering a serious "
1894                     "error.  During the first @reconnect-delay seconds, all "
1895                     "requests are paused and will be rerun on a successful "
1896                     "reconnect. After that time, any delayed requests and all "
1897                     "future requests before a successful reconnect will "
1898                     "immediately fail. Default 0",
1899         },
1900         { /* end of list */ }
1901     },
1902 };
1903 
1904 static int nbd_process_options(BlockDriverState *bs, QDict *options,
1905                                Error **errp)
1906 {
1907     BDRVNBDState *s = bs->opaque;
1908     QemuOpts *opts;
1909     int ret = -EINVAL;
1910 
1911     opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort);
1912     if (!qemu_opts_absorb_qdict(opts, options, errp)) {
1913         goto error;
1914     }
1915 
1916     /* Translate @host, @port, and @path to a SocketAddress */
1917     if (!nbd_process_legacy_socket_options(options, opts, errp)) {
1918         goto error;
1919     }
1920 
1921     /* Pop the config into our state object. Exit if invalid. */
1922     s->saddr = nbd_config(s, options, errp);
1923     if (!s->saddr) {
1924         goto error;
1925     }
1926 
1927     s->export = g_strdup(qemu_opt_get(opts, "export"));
1928     if (s->export && strlen(s->export) > NBD_MAX_STRING_SIZE) {
1929         error_setg(errp, "export name too long to send to server");
1930         goto error;
1931     }
1932 
1933     s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds"));
1934     if (s->tlscredsid) {
1935         s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
1936         if (!s->tlscreds) {
1937             goto error;
1938         }
1939 
1940         /* TODO SOCKET_ADDRESS_KIND_FD where fd has AF_INET or AF_INET6 */
1941         if (s->saddr->type != SOCKET_ADDRESS_TYPE_INET) {
1942             error_setg(errp, "TLS only supported over IP sockets");
1943             goto error;
1944         }
1945         s->hostname = s->saddr->u.inet.host;
1946     }
1947 
1948     s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
1949     if (s->x_dirty_bitmap && strlen(s->x_dirty_bitmap) > NBD_MAX_STRING_SIZE) {
1950         error_setg(errp, "x-dirty-bitmap query too long to send to server");
1951         goto error;
1952     }
1953 
1954     s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
1955 
1956     ret = 0;
1957 
1958  error:
1959     qemu_opts_del(opts);
1960     return ret;
1961 }
1962 
1963 static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
1964                     Error **errp)
1965 {
1966     int ret;
1967     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1968 
1969     s->bs = bs;
1970     qemu_co_mutex_init(&s->send_mutex);
1971     qemu_co_queue_init(&s->free_sema);
1972 
1973     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
1974         return -EEXIST;
1975     }
1976 
1977     ret = nbd_process_options(bs, options, errp);
1978     if (ret < 0) {
1979         goto fail;
1980     }
1981 
1982     s->conn = nbd_client_connection_new(s->saddr, true, s->export,
1983                                         s->x_dirty_bitmap, s->tlscreds);
1984 
1985     /* TODO: Configurable retry-until-timeout behaviour. */
1986     ret = nbd_do_establish_connection(bs, errp);
1987     if (ret < 0) {
1988         goto fail;
1989     }
1990 
1991     s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
1992     bdrv_inc_in_flight(bs);
1993     aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
1994 
1995     return 0;
1996 
1997 fail:
1998     nbd_clear_bdrvstate(bs);
1999     return ret;
2000 }
2001 
2002 static int nbd_co_flush(BlockDriverState *bs)
2003 {
2004     return nbd_client_co_flush(bs);
2005 }
2006 
2007 static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
2008 {
2009     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
2010     uint32_t min = s->info.min_block;
2011     uint32_t max = MIN_NON_ZERO(NBD_MAX_BUFFER_SIZE, s->info.max_block);
2012 
2013     /*
2014      * If the server did not advertise an alignment:
2015      * - a size that is not sector-aligned implies that an alignment
2016      *   of 1 can be used to access those tail bytes
2017      * - advertisement of block status requires an alignment of 1, so
2018      *   that we don't violate block layer constraints that block
2019      *   status is always aligned (as we can't control whether the
2020      *   server will report sub-sector extents, such as a hole at EOF
2021      *   on an unaligned POSIX file)
2022      * - otherwise, assume the server is so old that we are safer avoiding
2023      *   sub-sector requests
2024      */
2025     if (!min) {
2026         min = (!QEMU_IS_ALIGNED(s->info.size, BDRV_SECTOR_SIZE) ||
2027                s->info.base_allocation) ? 1 : BDRV_SECTOR_SIZE;
2028     }
2029 
2030     bs->bl.request_alignment = min;
2031     bs->bl.max_pdiscard = QEMU_ALIGN_DOWN(INT_MAX, min);
2032     bs->bl.max_pwrite_zeroes = max;
2033     bs->bl.max_transfer = max;
2034 
2035     if (s->info.opt_block &&
2036         s->info.opt_block > bs->bl.opt_transfer) {
2037         bs->bl.opt_transfer = s->info.opt_block;
2038     }
2039 }
2040 
2041 static void nbd_close(BlockDriverState *bs)
2042 {
2043     nbd_client_close(bs);
2044     nbd_clear_bdrvstate(bs);
2045 }
2046 
2047 /*
2048  * NBD cannot truncate, but if the caller asks to truncate to the same size, or
2049  * to a smaller size with exact=false, there is no reason to fail the
2050  * operation.
2051  *
2052  * Preallocation mode is ignored since it does not seems useful to fail when
2053  * we never change anything.
2054  */
2055 static int coroutine_fn nbd_co_truncate(BlockDriverState *bs, int64_t offset,
2056                                         bool exact, PreallocMode prealloc,
2057                                         BdrvRequestFlags flags, Error **errp)
2058 {
2059     BDRVNBDState *s = bs->opaque;
2060 
2061     if (offset != s->info.size && exact) {
2062         error_setg(errp, "Cannot resize NBD nodes");
2063         return -ENOTSUP;
2064     }
2065 
2066     if (offset > s->info.size) {
2067         error_setg(errp, "Cannot grow NBD nodes");
2068         return -EINVAL;
2069     }
2070 
2071     return 0;
2072 }
2073 
2074 static int64_t nbd_getlength(BlockDriverState *bs)
2075 {
2076     BDRVNBDState *s = bs->opaque;
2077 
2078     return s->info.size;
2079 }
2080 
2081 static void nbd_refresh_filename(BlockDriverState *bs)
2082 {
2083     BDRVNBDState *s = bs->opaque;
2084     const char *host = NULL, *port = NULL, *path = NULL;
2085     size_t len = 0;
2086 
2087     if (s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
2088         const InetSocketAddress *inet = &s->saddr->u.inet;
2089         if (!inet->has_ipv4 && !inet->has_ipv6 && !inet->has_to) {
2090             host = inet->host;
2091             port = inet->port;
2092         }
2093     } else if (s->saddr->type == SOCKET_ADDRESS_TYPE_UNIX) {
2094         path = s->saddr->u.q_unix.path;
2095     } /* else can't represent as pseudo-filename */
2096 
2097     if (path && s->export) {
2098         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2099                        "nbd+unix:///%s?socket=%s", s->export, path);
2100     } else if (path && !s->export) {
2101         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2102                        "nbd+unix://?socket=%s", path);
2103     } else if (host && s->export) {
2104         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2105                        "nbd://%s:%s/%s", host, port, s->export);
2106     } else if (host && !s->export) {
2107         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2108                        "nbd://%s:%s", host, port);
2109     }
2110     if (len >= sizeof(bs->exact_filename)) {
2111         /* Name is too long to represent exactly, so leave it empty. */
2112         bs->exact_filename[0] = '\0';
2113     }
2114 }
2115 
2116 static char *nbd_dirname(BlockDriverState *bs, Error **errp)
2117 {
2118     /* The generic bdrv_dirname() implementation is able to work out some
2119      * directory name for NBD nodes, but that would be wrong. So far there is no
2120      * specification for how "export paths" would work, so NBD does not have
2121      * directory names. */
2122     error_setg(errp, "Cannot generate a base directory for NBD nodes");
2123     return NULL;
2124 }
2125 
2126 static const char *const nbd_strong_runtime_opts[] = {
2127     "path",
2128     "host",
2129     "port",
2130     "export",
2131     "tls-creds",
2132     "server.",
2133 
2134     NULL
2135 };
2136 
2137 static void nbd_cancel_in_flight(BlockDriverState *bs)
2138 {
2139     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
2140 
2141     reconnect_delay_timer_del(s);
2142 
2143     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
2144         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
2145         qemu_co_queue_restart_all(&s->free_sema);
2146     }
2147 }
2148 
2149 static BlockDriver bdrv_nbd = {
2150     .format_name                = "nbd",
2151     .protocol_name              = "nbd",
2152     .instance_size              = sizeof(BDRVNBDState),
2153     .bdrv_parse_filename        = nbd_parse_filename,
2154     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2155     .create_opts                = &bdrv_create_opts_simple,
2156     .bdrv_file_open             = nbd_open,
2157     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2158     .bdrv_co_preadv             = nbd_client_co_preadv,
2159     .bdrv_co_pwritev            = nbd_client_co_pwritev,
2160     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2161     .bdrv_close                 = nbd_close,
2162     .bdrv_co_flush_to_os        = nbd_co_flush,
2163     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2164     .bdrv_refresh_limits        = nbd_refresh_limits,
2165     .bdrv_co_truncate           = nbd_co_truncate,
2166     .bdrv_getlength             = nbd_getlength,
2167     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
2168     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
2169     .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
2170     .bdrv_co_drain_end          = nbd_client_co_drain_end,
2171     .bdrv_refresh_filename      = nbd_refresh_filename,
2172     .bdrv_co_block_status       = nbd_client_co_block_status,
2173     .bdrv_dirname               = nbd_dirname,
2174     .strong_runtime_opts        = nbd_strong_runtime_opts,
2175     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2176 };
2177 
2178 static BlockDriver bdrv_nbd_tcp = {
2179     .format_name                = "nbd",
2180     .protocol_name              = "nbd+tcp",
2181     .instance_size              = sizeof(BDRVNBDState),
2182     .bdrv_parse_filename        = nbd_parse_filename,
2183     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2184     .create_opts                = &bdrv_create_opts_simple,
2185     .bdrv_file_open             = nbd_open,
2186     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2187     .bdrv_co_preadv             = nbd_client_co_preadv,
2188     .bdrv_co_pwritev            = nbd_client_co_pwritev,
2189     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2190     .bdrv_close                 = nbd_close,
2191     .bdrv_co_flush_to_os        = nbd_co_flush,
2192     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2193     .bdrv_refresh_limits        = nbd_refresh_limits,
2194     .bdrv_co_truncate           = nbd_co_truncate,
2195     .bdrv_getlength             = nbd_getlength,
2196     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
2197     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
2198     .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
2199     .bdrv_co_drain_end          = nbd_client_co_drain_end,
2200     .bdrv_refresh_filename      = nbd_refresh_filename,
2201     .bdrv_co_block_status       = nbd_client_co_block_status,
2202     .bdrv_dirname               = nbd_dirname,
2203     .strong_runtime_opts        = nbd_strong_runtime_opts,
2204     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2205 };
2206 
2207 static BlockDriver bdrv_nbd_unix = {
2208     .format_name                = "nbd",
2209     .protocol_name              = "nbd+unix",
2210     .instance_size              = sizeof(BDRVNBDState),
2211     .bdrv_parse_filename        = nbd_parse_filename,
2212     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2213     .create_opts                = &bdrv_create_opts_simple,
2214     .bdrv_file_open             = nbd_open,
2215     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2216     .bdrv_co_preadv             = nbd_client_co_preadv,
2217     .bdrv_co_pwritev            = nbd_client_co_pwritev,
2218     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2219     .bdrv_close                 = nbd_close,
2220     .bdrv_co_flush_to_os        = nbd_co_flush,
2221     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2222     .bdrv_refresh_limits        = nbd_refresh_limits,
2223     .bdrv_co_truncate           = nbd_co_truncate,
2224     .bdrv_getlength             = nbd_getlength,
2225     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
2226     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
2227     .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
2228     .bdrv_co_drain_end          = nbd_client_co_drain_end,
2229     .bdrv_refresh_filename      = nbd_refresh_filename,
2230     .bdrv_co_block_status       = nbd_client_co_block_status,
2231     .bdrv_dirname               = nbd_dirname,
2232     .strong_runtime_opts        = nbd_strong_runtime_opts,
2233     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2234 };
2235 
2236 static void bdrv_nbd_init(void)
2237 {
2238     bdrv_register(&bdrv_nbd);
2239     bdrv_register(&bdrv_nbd_tcp);
2240     bdrv_register(&bdrv_nbd_unix);
2241 }
2242 
2243 block_init(bdrv_nbd_init);
2244