xref: /qemu/nbd/server.c (revision 7a4e543d)
1 /*
2  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
3  *
4  *  Network Block Device Server Side
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; under version 2 of the License.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "qemu/osdep.h"
20 #include "nbd-internal.h"
21 
22 static int system_errno_to_nbd_errno(int err)
23 {
24     switch (err) {
25     case 0:
26         return NBD_SUCCESS;
27     case EPERM:
28         return NBD_EPERM;
29     case EIO:
30         return NBD_EIO;
31     case ENOMEM:
32         return NBD_ENOMEM;
33 #ifdef EDQUOT
34     case EDQUOT:
35 #endif
36     case EFBIG:
37     case ENOSPC:
38         return NBD_ENOSPC;
39     case EINVAL:
40     default:
41         return NBD_EINVAL;
42     }
43 }
44 
45 /* Definitions for opaque data types */
46 
47 typedef struct NBDRequest NBDRequest;
48 
49 struct NBDRequest {
50     QSIMPLEQ_ENTRY(NBDRequest) entry;
51     NBDClient *client;
52     uint8_t *data;
53 };
54 
55 struct NBDExport {
56     int refcount;
57     void (*close)(NBDExport *exp);
58 
59     BlockBackend *blk;
60     char *name;
61     off_t dev_offset;
62     off_t size;
63     uint32_t nbdflags;
64     QTAILQ_HEAD(, NBDClient) clients;
65     QTAILQ_ENTRY(NBDExport) next;
66 
67     AioContext *ctx;
68 
69     Notifier eject_notifier;
70 };
71 
72 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
73 
74 struct NBDClient {
75     int refcount;
76     void (*close)(NBDClient *client);
77 
78     NBDExport *exp;
79     int sock;
80 
81     Coroutine *recv_coroutine;
82 
83     CoMutex send_lock;
84     Coroutine *send_coroutine;
85 
86     bool can_read;
87 
88     QTAILQ_ENTRY(NBDClient) next;
89     int nb_requests;
90     bool closing;
91 };
92 
93 /* That's all folks */
94 
95 static void nbd_set_handlers(NBDClient *client);
96 static void nbd_unset_handlers(NBDClient *client);
97 static void nbd_update_can_read(NBDClient *client);
98 
99 static void nbd_negotiate_continue(void *opaque)
100 {
101     qemu_coroutine_enter(opaque, NULL);
102 }
103 
104 static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
105 {
106     ssize_t ret;
107 
108     assert(qemu_in_coroutine());
109     /* Negotiation are always in main loop. */
110     qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
111                         qemu_coroutine_self());
112     ret = read_sync(fd, buffer, size);
113     qemu_set_fd_handler(fd, NULL, NULL, NULL);
114     return ret;
115 
116 }
117 
118 static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
119 {
120     ssize_t ret;
121 
122     assert(qemu_in_coroutine());
123     /* Negotiation are always in main loop. */
124     qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
125                         qemu_coroutine_self());
126     ret = write_sync(fd, buffer, size);
127     qemu_set_fd_handler(fd, NULL, NULL, NULL);
128     return ret;
129 }
130 
131 static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
132 {
133     ssize_t ret, dropped = size;
134     uint8_t *buffer = g_malloc(MIN(65536, size));
135 
136     while (size > 0) {
137         ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
138         if (ret < 0) {
139             g_free(buffer);
140             return ret;
141         }
142 
143         assert(ret <= size);
144         size -= ret;
145     }
146 
147     g_free(buffer);
148     return dropped;
149 }
150 
151 /* Basic flow for negotiation
152 
153    Server         Client
154    Negotiate
155 
156    or
157 
158    Server         Client
159    Negotiate #1
160                   Option
161    Negotiate #2
162 
163    ----
164 
165    followed by
166 
167    Server         Client
168                   Request
169    Response
170                   Request
171    Response
172                   ...
173    ...
174                   Request (type == 2)
175 
176 */
177 
178 static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
179 {
180     uint64_t magic;
181     uint32_t len;
182 
183     magic = cpu_to_be64(NBD_REP_MAGIC);
184     if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
185         LOG("write failed (rep magic)");
186         return -EINVAL;
187     }
188     opt = cpu_to_be32(opt);
189     if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
190         LOG("write failed (rep opt)");
191         return -EINVAL;
192     }
193     type = cpu_to_be32(type);
194     if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
195         LOG("write failed (rep type)");
196         return -EINVAL;
197     }
198     len = cpu_to_be32(0);
199     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
200         LOG("write failed (rep data length)");
201         return -EINVAL;
202     }
203     return 0;
204 }
205 
206 static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
207 {
208     uint64_t magic, name_len;
209     uint32_t opt, type, len;
210 
211     name_len = strlen(exp->name);
212     magic = cpu_to_be64(NBD_REP_MAGIC);
213     if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
214         LOG("write failed (magic)");
215         return -EINVAL;
216      }
217     opt = cpu_to_be32(NBD_OPT_LIST);
218     if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
219         LOG("write failed (opt)");
220         return -EINVAL;
221     }
222     type = cpu_to_be32(NBD_REP_SERVER);
223     if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
224         LOG("write failed (reply type)");
225         return -EINVAL;
226     }
227     len = cpu_to_be32(name_len + sizeof(len));
228     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
229         LOG("write failed (length)");
230         return -EINVAL;
231     }
232     len = cpu_to_be32(name_len);
233     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
234         LOG("write failed (length)");
235         return -EINVAL;
236     }
237     if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
238         LOG("write failed (buffer)");
239         return -EINVAL;
240     }
241     return 0;
242 }
243 
244 static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
245 {
246     int csock;
247     NBDExport *exp;
248 
249     csock = client->sock;
250     if (length) {
251         if (nbd_negotiate_drop_sync(csock, length) != length) {
252             return -EIO;
253         }
254         return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
255     }
256 
257     /* For each export, send a NBD_REP_SERVER reply. */
258     QTAILQ_FOREACH(exp, &exports, next) {
259         if (nbd_negotiate_send_rep_list(csock, exp)) {
260             return -EINVAL;
261         }
262     }
263     /* Finish with a NBD_REP_ACK. */
264     return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
265 }
266 
267 static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
268 {
269     int rc = -EINVAL, csock = client->sock;
270     char name[256];
271 
272     /* Client sends:
273         [20 ..  xx]   export name (length bytes)
274      */
275     TRACE("Checking length");
276     if (length > 255) {
277         LOG("Bad length received");
278         goto fail;
279     }
280     if (nbd_negotiate_read(csock, name, length) != length) {
281         LOG("read failed");
282         goto fail;
283     }
284     name[length] = '\0';
285 
286     client->exp = nbd_export_find(name);
287     if (!client->exp) {
288         LOG("export not found");
289         goto fail;
290     }
291 
292     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
293     nbd_export_get(client->exp);
294     rc = 0;
295 fail:
296     return rc;
297 }
298 
299 static int nbd_negotiate_options(NBDClient *client)
300 {
301     int csock = client->sock;
302     uint32_t flags;
303 
304     /* Client sends:
305         [ 0 ..   3]   client flags
306 
307         [ 0 ..   7]   NBD_OPTS_MAGIC
308         [ 8 ..  11]   NBD option
309         [12 ..  15]   Data length
310         ...           Rest of request
311 
312         [ 0 ..   7]   NBD_OPTS_MAGIC
313         [ 8 ..  11]   Second NBD option
314         [12 ..  15]   Data length
315         ...           Rest of request
316     */
317 
318     if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
319         LOG("read failed");
320         return -EIO;
321     }
322     TRACE("Checking client flags");
323     be32_to_cpus(&flags);
324     if (flags != 0 && flags != NBD_FLAG_C_FIXED_NEWSTYLE) {
325         LOG("Bad client flags received");
326         return -EIO;
327     }
328 
329     while (1) {
330         int ret;
331         uint32_t tmp, length;
332         uint64_t magic;
333 
334         if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
335             LOG("read failed");
336             return -EINVAL;
337         }
338         TRACE("Checking opts magic");
339         if (magic != be64_to_cpu(NBD_OPTS_MAGIC)) {
340             LOG("Bad magic received");
341             return -EINVAL;
342         }
343 
344         if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
345             LOG("read failed");
346             return -EINVAL;
347         }
348 
349         if (nbd_negotiate_read(csock, &length,
350                                sizeof(length)) != sizeof(length)) {
351             LOG("read failed");
352             return -EINVAL;
353         }
354         length = be32_to_cpu(length);
355 
356         TRACE("Checking option");
357         switch (be32_to_cpu(tmp)) {
358         case NBD_OPT_LIST:
359             ret = nbd_negotiate_handle_list(client, length);
360             if (ret < 0) {
361                 return ret;
362             }
363             break;
364 
365         case NBD_OPT_ABORT:
366             return -EINVAL;
367 
368         case NBD_OPT_EXPORT_NAME:
369             return nbd_negotiate_handle_export_name(client, length);
370 
371         default:
372             tmp = be32_to_cpu(tmp);
373             LOG("Unsupported option 0x%x", tmp);
374             nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
375             return -EINVAL;
376         }
377     }
378 }
379 
380 typedef struct {
381     NBDClient *client;
382     Coroutine *co;
383 } NBDClientNewData;
384 
385 static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
386 {
387     NBDClient *client = data->client;
388     int csock = client->sock;
389     char buf[8 + 8 + 8 + 128];
390     int rc;
391     const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
392                          NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA);
393 
394     /* Negotiation header without options:
395         [ 0 ..   7]   passwd       ("NBDMAGIC")
396         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
397         [16 ..  23]   size
398         [24 ..  25]   server flags (0)
399         [26 ..  27]   export flags
400         [28 .. 151]   reserved     (0)
401 
402        Negotiation header with options, part 1:
403         [ 0 ..   7]   passwd       ("NBDMAGIC")
404         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
405         [16 ..  17]   server flags (0)
406 
407        part 2 (after options are sent):
408         [18 ..  25]   size
409         [26 ..  27]   export flags
410         [28 .. 151]   reserved     (0)
411      */
412 
413     rc = -EINVAL;
414 
415     TRACE("Beginning negotiation.");
416     memset(buf, 0, sizeof(buf));
417     memcpy(buf, "NBDMAGIC", 8);
418     if (client->exp) {
419         assert ((client->exp->nbdflags & ~65535) == 0);
420         stq_be_p(buf + 8, NBD_CLIENT_MAGIC);
421         stq_be_p(buf + 16, client->exp->size);
422         stw_be_p(buf + 26, client->exp->nbdflags | myflags);
423     } else {
424         stq_be_p(buf + 8, NBD_OPTS_MAGIC);
425         stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE);
426     }
427 
428     if (client->exp) {
429         if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
430             LOG("write failed");
431             goto fail;
432         }
433     } else {
434         if (nbd_negotiate_write(csock, buf, 18) != 18) {
435             LOG("write failed");
436             goto fail;
437         }
438         rc = nbd_negotiate_options(client);
439         if (rc != 0) {
440             LOG("option negotiation failed");
441             goto fail;
442         }
443 
444         assert ((client->exp->nbdflags & ~65535) == 0);
445         stq_be_p(buf + 18, client->exp->size);
446         stw_be_p(buf + 26, client->exp->nbdflags | myflags);
447         if (nbd_negotiate_write(csock, buf + 18,
448                                 sizeof(buf) - 18) != sizeof(buf) - 18) {
449             LOG("write failed");
450             goto fail;
451         }
452     }
453 
454     TRACE("Negotiation succeeded.");
455     rc = 0;
456 fail:
457     return rc;
458 }
459 
460 #ifdef __linux__
461 
462 int nbd_disconnect(int fd)
463 {
464     ioctl(fd, NBD_CLEAR_QUE);
465     ioctl(fd, NBD_DISCONNECT);
466     ioctl(fd, NBD_CLEAR_SOCK);
467     return 0;
468 }
469 
470 #else
471 
472 int nbd_disconnect(int fd)
473 {
474     return -ENOTSUP;
475 }
476 #endif
477 
478 static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
479 {
480     uint8_t buf[NBD_REQUEST_SIZE];
481     uint32_t magic;
482     ssize_t ret;
483 
484     ret = read_sync(csock, buf, sizeof(buf));
485     if (ret < 0) {
486         return ret;
487     }
488 
489     if (ret != sizeof(buf)) {
490         LOG("read failed");
491         return -EINVAL;
492     }
493 
494     /* Request
495        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
496        [ 4 ..  7]   type    (0 == READ, 1 == WRITE)
497        [ 8 .. 15]   handle
498        [16 .. 23]   from
499        [24 .. 27]   len
500      */
501 
502     magic = be32_to_cpup((uint32_t*)buf);
503     request->type  = be32_to_cpup((uint32_t*)(buf + 4));
504     request->handle = be64_to_cpup((uint64_t*)(buf + 8));
505     request->from  = be64_to_cpup((uint64_t*)(buf + 16));
506     request->len   = be32_to_cpup((uint32_t*)(buf + 24));
507 
508     TRACE("Got request: "
509           "{ magic = 0x%x, .type = %d, from = %" PRIu64" , len = %u }",
510           magic, request->type, request->from, request->len);
511 
512     if (magic != NBD_REQUEST_MAGIC) {
513         LOG("invalid magic (got 0x%x)", magic);
514         return -EINVAL;
515     }
516     return 0;
517 }
518 
519 static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
520 {
521     uint8_t buf[NBD_REPLY_SIZE];
522     ssize_t ret;
523 
524     reply->error = system_errno_to_nbd_errno(reply->error);
525 
526     /* Reply
527        [ 0 ..  3]    magic   (NBD_REPLY_MAGIC)
528        [ 4 ..  7]    error   (0 == no error)
529        [ 7 .. 15]    handle
530      */
531     stl_be_p(buf, NBD_REPLY_MAGIC);
532     stl_be_p(buf + 4, reply->error);
533     stq_be_p(buf + 8, reply->handle);
534 
535     TRACE("Sending response to client");
536 
537     ret = write_sync(csock, buf, sizeof(buf));
538     if (ret < 0) {
539         return ret;
540     }
541 
542     if (ret != sizeof(buf)) {
543         LOG("writing to socket failed");
544         return -EINVAL;
545     }
546     return 0;
547 }
548 
549 #define MAX_NBD_REQUESTS 16
550 
551 void nbd_client_get(NBDClient *client)
552 {
553     client->refcount++;
554 }
555 
556 void nbd_client_put(NBDClient *client)
557 {
558     if (--client->refcount == 0) {
559         /* The last reference should be dropped by client->close,
560          * which is called by client_close.
561          */
562         assert(client->closing);
563 
564         nbd_unset_handlers(client);
565         close(client->sock);
566         client->sock = -1;
567         if (client->exp) {
568             QTAILQ_REMOVE(&client->exp->clients, client, next);
569             nbd_export_put(client->exp);
570         }
571         g_free(client);
572     }
573 }
574 
575 static void client_close(NBDClient *client)
576 {
577     if (client->closing) {
578         return;
579     }
580 
581     client->closing = true;
582 
583     /* Force requests to finish.  They will drop their own references,
584      * then we'll close the socket and free the NBDClient.
585      */
586     shutdown(client->sock, 2);
587 
588     /* Also tell the client, so that they release their reference.  */
589     if (client->close) {
590         client->close(client);
591     }
592 }
593 
594 static NBDRequest *nbd_request_get(NBDClient *client)
595 {
596     NBDRequest *req;
597 
598     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
599     client->nb_requests++;
600     nbd_update_can_read(client);
601 
602     req = g_new0(NBDRequest, 1);
603     nbd_client_get(client);
604     req->client = client;
605     return req;
606 }
607 
608 static void nbd_request_put(NBDRequest *req)
609 {
610     NBDClient *client = req->client;
611 
612     if (req->data) {
613         qemu_vfree(req->data);
614     }
615     g_free(req);
616 
617     client->nb_requests--;
618     nbd_update_can_read(client);
619     nbd_client_put(client);
620 }
621 
622 static void blk_aio_attached(AioContext *ctx, void *opaque)
623 {
624     NBDExport *exp = opaque;
625     NBDClient *client;
626 
627     TRACE("Export %s: Attaching clients to AIO context %p\n", exp->name, ctx);
628 
629     exp->ctx = ctx;
630 
631     QTAILQ_FOREACH(client, &exp->clients, next) {
632         nbd_set_handlers(client);
633     }
634 }
635 
636 static void blk_aio_detach(void *opaque)
637 {
638     NBDExport *exp = opaque;
639     NBDClient *client;
640 
641     TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
642 
643     QTAILQ_FOREACH(client, &exp->clients, next) {
644         nbd_unset_handlers(client);
645     }
646 
647     exp->ctx = NULL;
648 }
649 
650 static void nbd_eject_notifier(Notifier *n, void *data)
651 {
652     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
653     nbd_export_close(exp);
654 }
655 
656 NBDExport *nbd_export_new(BlockBackend *blk, off_t dev_offset, off_t size,
657                           uint32_t nbdflags, void (*close)(NBDExport *),
658                           Error **errp)
659 {
660     NBDExport *exp = g_malloc0(sizeof(NBDExport));
661     exp->refcount = 1;
662     QTAILQ_INIT(&exp->clients);
663     exp->blk = blk;
664     exp->dev_offset = dev_offset;
665     exp->nbdflags = nbdflags;
666     exp->size = size < 0 ? blk_getlength(blk) : size;
667     if (exp->size < 0) {
668         error_setg_errno(errp, -exp->size,
669                          "Failed to determine the NBD export's length");
670         goto fail;
671     }
672     exp->size -= exp->size % BDRV_SECTOR_SIZE;
673 
674     exp->close = close;
675     exp->ctx = blk_get_aio_context(blk);
676     blk_ref(blk);
677     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
678 
679     exp->eject_notifier.notify = nbd_eject_notifier;
680     blk_add_remove_bs_notifier(blk, &exp->eject_notifier);
681 
682     /*
683      * NBD exports are used for non-shared storage migration.  Make sure
684      * that BDRV_O_INACTIVE is cleared and the image is ready for write
685      * access since the export could be available before migration handover.
686      */
687     aio_context_acquire(exp->ctx);
688     blk_invalidate_cache(blk, NULL);
689     aio_context_release(exp->ctx);
690     return exp;
691 
692 fail:
693     g_free(exp);
694     return NULL;
695 }
696 
697 NBDExport *nbd_export_find(const char *name)
698 {
699     NBDExport *exp;
700     QTAILQ_FOREACH(exp, &exports, next) {
701         if (strcmp(name, exp->name) == 0) {
702             return exp;
703         }
704     }
705 
706     return NULL;
707 }
708 
709 void nbd_export_set_name(NBDExport *exp, const char *name)
710 {
711     if (exp->name == name) {
712         return;
713     }
714 
715     nbd_export_get(exp);
716     if (exp->name != NULL) {
717         g_free(exp->name);
718         exp->name = NULL;
719         QTAILQ_REMOVE(&exports, exp, next);
720         nbd_export_put(exp);
721     }
722     if (name != NULL) {
723         nbd_export_get(exp);
724         exp->name = g_strdup(name);
725         QTAILQ_INSERT_TAIL(&exports, exp, next);
726     }
727     nbd_export_put(exp);
728 }
729 
730 void nbd_export_close(NBDExport *exp)
731 {
732     NBDClient *client, *next;
733 
734     nbd_export_get(exp);
735     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
736         client_close(client);
737     }
738     nbd_export_set_name(exp, NULL);
739     nbd_export_put(exp);
740 }
741 
742 void nbd_export_get(NBDExport *exp)
743 {
744     assert(exp->refcount > 0);
745     exp->refcount++;
746 }
747 
748 void nbd_export_put(NBDExport *exp)
749 {
750     assert(exp->refcount > 0);
751     if (exp->refcount == 1) {
752         nbd_export_close(exp);
753     }
754 
755     if (--exp->refcount == 0) {
756         assert(exp->name == NULL);
757 
758         if (exp->close) {
759             exp->close(exp);
760         }
761 
762         if (exp->blk) {
763             notifier_remove(&exp->eject_notifier);
764             blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
765                                             blk_aio_detach, exp);
766             blk_unref(exp->blk);
767             exp->blk = NULL;
768         }
769 
770         g_free(exp);
771     }
772 }
773 
774 BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
775 {
776     return exp->blk;
777 }
778 
779 void nbd_export_close_all(void)
780 {
781     NBDExport *exp, *next;
782 
783     QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
784         nbd_export_close(exp);
785     }
786 }
787 
788 static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
789                                  int len)
790 {
791     NBDClient *client = req->client;
792     int csock = client->sock;
793     ssize_t rc, ret;
794 
795     qemu_co_mutex_lock(&client->send_lock);
796     client->send_coroutine = qemu_coroutine_self();
797     nbd_set_handlers(client);
798 
799     if (!len) {
800         rc = nbd_send_reply(csock, reply);
801     } else {
802         socket_set_cork(csock, 1);
803         rc = nbd_send_reply(csock, reply);
804         if (rc >= 0) {
805             ret = qemu_co_send(csock, req->data, len);
806             if (ret != len) {
807                 rc = -EIO;
808             }
809         }
810         socket_set_cork(csock, 0);
811     }
812 
813     client->send_coroutine = NULL;
814     nbd_set_handlers(client);
815     qemu_co_mutex_unlock(&client->send_lock);
816     return rc;
817 }
818 
819 static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
820 {
821     NBDClient *client = req->client;
822     int csock = client->sock;
823     uint32_t command;
824     ssize_t rc;
825 
826     client->recv_coroutine = qemu_coroutine_self();
827     nbd_update_can_read(client);
828 
829     rc = nbd_receive_request(csock, request);
830     if (rc < 0) {
831         if (rc != -EAGAIN) {
832             rc = -EIO;
833         }
834         goto out;
835     }
836 
837     if ((request->from + request->len) < request->from) {
838         LOG("integer overflow detected! "
839             "you're probably being attacked");
840         rc = -EINVAL;
841         goto out;
842     }
843 
844     TRACE("Decoding type");
845 
846     command = request->type & NBD_CMD_MASK_COMMAND;
847     if (command == NBD_CMD_READ || command == NBD_CMD_WRITE) {
848         if (request->len > NBD_MAX_BUFFER_SIZE) {
849             LOG("len (%u) is larger than max len (%u)",
850                 request->len, NBD_MAX_BUFFER_SIZE);
851             rc = -EINVAL;
852             goto out;
853         }
854 
855         req->data = blk_try_blockalign(client->exp->blk, request->len);
856         if (req->data == NULL) {
857             rc = -ENOMEM;
858             goto out;
859         }
860     }
861     if (command == NBD_CMD_WRITE) {
862         TRACE("Reading %u byte(s)", request->len);
863 
864         if (qemu_co_recv(csock, req->data, request->len) != request->len) {
865             LOG("reading from socket failed");
866             rc = -EIO;
867             goto out;
868         }
869     }
870     rc = 0;
871 
872 out:
873     client->recv_coroutine = NULL;
874     nbd_update_can_read(client);
875 
876     return rc;
877 }
878 
879 static void nbd_trip(void *opaque)
880 {
881     NBDClient *client = opaque;
882     NBDExport *exp = client->exp;
883     NBDRequest *req;
884     struct nbd_request request;
885     struct nbd_reply reply;
886     ssize_t ret;
887     uint32_t command;
888 
889     TRACE("Reading request.");
890     if (client->closing) {
891         return;
892     }
893 
894     req = nbd_request_get(client);
895     ret = nbd_co_receive_request(req, &request);
896     if (ret == -EAGAIN) {
897         goto done;
898     }
899     if (ret == -EIO) {
900         goto out;
901     }
902 
903     reply.handle = request.handle;
904     reply.error = 0;
905 
906     if (ret < 0) {
907         reply.error = -ret;
908         goto error_reply;
909     }
910     command = request.type & NBD_CMD_MASK_COMMAND;
911     if (command != NBD_CMD_DISC && (request.from + request.len) > exp->size) {
912             LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64
913             ", Offset: %" PRIu64 "\n",
914                     request.from, request.len,
915                     (uint64_t)exp->size, (uint64_t)exp->dev_offset);
916         LOG("requested operation past EOF--bad client?");
917         goto invalid_request;
918     }
919 
920     if (client->closing) {
921         /*
922          * The client may be closed when we are blocked in
923          * nbd_co_receive_request()
924          */
925         goto done;
926     }
927 
928     switch (command) {
929     case NBD_CMD_READ:
930         TRACE("Request type is READ");
931 
932         if (request.type & NBD_CMD_FLAG_FUA) {
933             ret = blk_co_flush(exp->blk);
934             if (ret < 0) {
935                 LOG("flush failed");
936                 reply.error = -ret;
937                 goto error_reply;
938             }
939         }
940 
941         ret = blk_read(exp->blk,
942                        (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
943                        req->data, request.len / BDRV_SECTOR_SIZE);
944         if (ret < 0) {
945             LOG("reading from file failed");
946             reply.error = -ret;
947             goto error_reply;
948         }
949 
950         TRACE("Read %u byte(s)", request.len);
951         if (nbd_co_send_reply(req, &reply, request.len) < 0)
952             goto out;
953         break;
954     case NBD_CMD_WRITE:
955         TRACE("Request type is WRITE");
956 
957         if (exp->nbdflags & NBD_FLAG_READ_ONLY) {
958             TRACE("Server is read-only, return error");
959             reply.error = EROFS;
960             goto error_reply;
961         }
962 
963         TRACE("Writing to device");
964 
965         ret = blk_write(exp->blk,
966                         (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
967                         req->data, request.len / BDRV_SECTOR_SIZE);
968         if (ret < 0) {
969             LOG("writing to file failed");
970             reply.error = -ret;
971             goto error_reply;
972         }
973 
974         if (request.type & NBD_CMD_FLAG_FUA) {
975             ret = blk_co_flush(exp->blk);
976             if (ret < 0) {
977                 LOG("flush failed");
978                 reply.error = -ret;
979                 goto error_reply;
980             }
981         }
982 
983         if (nbd_co_send_reply(req, &reply, 0) < 0) {
984             goto out;
985         }
986         break;
987     case NBD_CMD_DISC:
988         TRACE("Request type is DISCONNECT");
989         errno = 0;
990         goto out;
991     case NBD_CMD_FLUSH:
992         TRACE("Request type is FLUSH");
993 
994         ret = blk_co_flush(exp->blk);
995         if (ret < 0) {
996             LOG("flush failed");
997             reply.error = -ret;
998         }
999         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1000             goto out;
1001         }
1002         break;
1003     case NBD_CMD_TRIM:
1004         TRACE("Request type is TRIM");
1005         ret = blk_co_discard(exp->blk, (request.from + exp->dev_offset)
1006                                        / BDRV_SECTOR_SIZE,
1007                              request.len / BDRV_SECTOR_SIZE);
1008         if (ret < 0) {
1009             LOG("discard failed");
1010             reply.error = -ret;
1011         }
1012         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1013             goto out;
1014         }
1015         break;
1016     default:
1017         LOG("invalid request type (%u) received", request.type);
1018     invalid_request:
1019         reply.error = EINVAL;
1020     error_reply:
1021         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1022             goto out;
1023         }
1024         break;
1025     }
1026 
1027     TRACE("Request/Reply complete");
1028 
1029 done:
1030     nbd_request_put(req);
1031     return;
1032 
1033 out:
1034     nbd_request_put(req);
1035     client_close(client);
1036 }
1037 
1038 static void nbd_read(void *opaque)
1039 {
1040     NBDClient *client = opaque;
1041 
1042     if (client->recv_coroutine) {
1043         qemu_coroutine_enter(client->recv_coroutine, NULL);
1044     } else {
1045         qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client);
1046     }
1047 }
1048 
1049 static void nbd_restart_write(void *opaque)
1050 {
1051     NBDClient *client = opaque;
1052 
1053     qemu_coroutine_enter(client->send_coroutine, NULL);
1054 }
1055 
1056 static void nbd_set_handlers(NBDClient *client)
1057 {
1058     if (client->exp && client->exp->ctx) {
1059         aio_set_fd_handler(client->exp->ctx, client->sock,
1060                            true,
1061                            client->can_read ? nbd_read : NULL,
1062                            client->send_coroutine ? nbd_restart_write : NULL,
1063                            client);
1064     }
1065 }
1066 
1067 static void nbd_unset_handlers(NBDClient *client)
1068 {
1069     if (client->exp && client->exp->ctx) {
1070         aio_set_fd_handler(client->exp->ctx, client->sock,
1071                            true, NULL, NULL, NULL);
1072     }
1073 }
1074 
1075 static void nbd_update_can_read(NBDClient *client)
1076 {
1077     bool can_read = client->recv_coroutine ||
1078                     client->nb_requests < MAX_NBD_REQUESTS;
1079 
1080     if (can_read != client->can_read) {
1081         client->can_read = can_read;
1082         nbd_set_handlers(client);
1083 
1084         /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
1085          * in nbd_set_handlers() will have taken care of that */
1086     }
1087 }
1088 
1089 static coroutine_fn void nbd_co_client_start(void *opaque)
1090 {
1091     NBDClientNewData *data = opaque;
1092     NBDClient *client = data->client;
1093     NBDExport *exp = client->exp;
1094 
1095     if (exp) {
1096         nbd_export_get(exp);
1097     }
1098     if (nbd_negotiate(data)) {
1099         client_close(client);
1100         goto out;
1101     }
1102     qemu_co_mutex_init(&client->send_lock);
1103     nbd_set_handlers(client);
1104 
1105     if (exp) {
1106         QTAILQ_INSERT_TAIL(&exp->clients, client, next);
1107     }
1108 out:
1109     g_free(data);
1110 }
1111 
1112 void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
1113 {
1114     NBDClient *client;
1115     NBDClientNewData *data = g_new(NBDClientNewData, 1);
1116 
1117     client = g_malloc0(sizeof(NBDClient));
1118     client->refcount = 1;
1119     client->exp = exp;
1120     client->sock = csock;
1121     client->can_read = true;
1122     client->close = close_fn;
1123 
1124     data->client = client;
1125     data->co = qemu_coroutine_create(nbd_co_client_start);
1126     qemu_coroutine_enter(data->co, data);
1127 }
1128