xref: /qemu/nbd/server.c (revision 2c533c54)
1 /*
2  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
3  *
4  *  Network Block Device Server Side
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; under version 2 of the License.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "qemu/osdep.h"
20 #include "qapi/error.h"
21 #include "nbd-internal.h"
22 
23 static int system_errno_to_nbd_errno(int err)
24 {
25     switch (err) {
26     case 0:
27         return NBD_SUCCESS;
28     case EPERM:
29     case EROFS:
30         return NBD_EPERM;
31     case EIO:
32         return NBD_EIO;
33     case ENOMEM:
34         return NBD_ENOMEM;
35 #ifdef EDQUOT
36     case EDQUOT:
37 #endif
38     case EFBIG:
39     case ENOSPC:
40         return NBD_ENOSPC;
41     case EINVAL:
42     default:
43         return NBD_EINVAL;
44     }
45 }
46 
47 /* Definitions for opaque data types */
48 
49 typedef struct NBDRequest NBDRequest;
50 
51 struct NBDRequest {
52     QSIMPLEQ_ENTRY(NBDRequest) entry;
53     NBDClient *client;
54     uint8_t *data;
55     bool complete;
56 };
57 
58 struct NBDExport {
59     int refcount;
60     void (*close)(NBDExport *exp);
61 
62     BlockBackend *blk;
63     char *name;
64     off_t dev_offset;
65     off_t size;
66     uint32_t nbdflags;
67     QTAILQ_HEAD(, NBDClient) clients;
68     QTAILQ_ENTRY(NBDExport) next;
69 
70     AioContext *ctx;
71 
72     Notifier eject_notifier;
73 };
74 
75 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
76 
77 struct NBDClient {
78     int refcount;
79     void (*close)(NBDClient *client);
80 
81     NBDExport *exp;
82     QCryptoTLSCreds *tlscreds;
83     char *tlsaclname;
84     QIOChannelSocket *sioc; /* The underlying data channel */
85     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
86 
87     Coroutine *recv_coroutine;
88 
89     CoMutex send_lock;
90     Coroutine *send_coroutine;
91 
92     bool can_read;
93 
94     QTAILQ_ENTRY(NBDClient) next;
95     int nb_requests;
96     bool closing;
97 };
98 
99 /* That's all folks */
100 
101 static void nbd_set_handlers(NBDClient *client);
102 static void nbd_unset_handlers(NBDClient *client);
103 static void nbd_update_can_read(NBDClient *client);
104 
105 static gboolean nbd_negotiate_continue(QIOChannel *ioc,
106                                        GIOCondition condition,
107                                        void *opaque)
108 {
109     qemu_coroutine_enter(opaque);
110     return TRUE;
111 }
112 
113 static ssize_t nbd_negotiate_read(QIOChannel *ioc, void *buffer, size_t size)
114 {
115     ssize_t ret;
116     guint watch;
117 
118     assert(qemu_in_coroutine());
119     /* Negotiation are always in main loop. */
120     watch = qio_channel_add_watch(ioc,
121                                   G_IO_IN,
122                                   nbd_negotiate_continue,
123                                   qemu_coroutine_self(),
124                                   NULL);
125     ret = read_sync(ioc, buffer, size);
126     g_source_remove(watch);
127     return ret;
128 
129 }
130 
131 static ssize_t nbd_negotiate_write(QIOChannel *ioc, void *buffer, size_t size)
132 {
133     ssize_t ret;
134     guint watch;
135 
136     assert(qemu_in_coroutine());
137     /* Negotiation are always in main loop. */
138     watch = qio_channel_add_watch(ioc,
139                                   G_IO_OUT,
140                                   nbd_negotiate_continue,
141                                   qemu_coroutine_self(),
142                                   NULL);
143     ret = write_sync(ioc, buffer, size);
144     g_source_remove(watch);
145     return ret;
146 }
147 
148 static ssize_t nbd_negotiate_drop_sync(QIOChannel *ioc, size_t size)
149 {
150     ssize_t ret, dropped = size;
151     uint8_t *buffer = g_malloc(MIN(65536, size));
152 
153     while (size > 0) {
154         ret = nbd_negotiate_read(ioc, buffer, MIN(65536, size));
155         if (ret < 0) {
156             g_free(buffer);
157             return ret;
158         }
159 
160         assert(ret <= size);
161         size -= ret;
162     }
163 
164     g_free(buffer);
165     return dropped;
166 }
167 
168 /* Basic flow for negotiation
169 
170    Server         Client
171    Negotiate
172 
173    or
174 
175    Server         Client
176    Negotiate #1
177                   Option
178    Negotiate #2
179 
180    ----
181 
182    followed by
183 
184    Server         Client
185                   Request
186    Response
187                   Request
188    Response
189                   ...
190    ...
191                   Request (type == 2)
192 
193 */
194 
195 static int nbd_negotiate_send_rep(QIOChannel *ioc, uint32_t type, uint32_t opt)
196 {
197     uint64_t magic;
198     uint32_t len;
199 
200     TRACE("Reply opt=%" PRIx32 " type=%" PRIx32, type, opt);
201 
202     magic = cpu_to_be64(NBD_REP_MAGIC);
203     if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
204         LOG("write failed (rep magic)");
205         return -EINVAL;
206     }
207     opt = cpu_to_be32(opt);
208     if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
209         LOG("write failed (rep opt)");
210         return -EINVAL;
211     }
212     type = cpu_to_be32(type);
213     if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
214         LOG("write failed (rep type)");
215         return -EINVAL;
216     }
217     len = cpu_to_be32(0);
218     if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
219         LOG("write failed (rep data length)");
220         return -EINVAL;
221     }
222     return 0;
223 }
224 
225 static int nbd_negotiate_send_rep_list(QIOChannel *ioc, NBDExport *exp)
226 {
227     uint64_t magic, name_len;
228     uint32_t opt, type, len;
229 
230     TRACE("Advertising export name '%s'", exp->name ? exp->name : "");
231     name_len = strlen(exp->name);
232     magic = cpu_to_be64(NBD_REP_MAGIC);
233     if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
234         LOG("write failed (magic)");
235         return -EINVAL;
236      }
237     opt = cpu_to_be32(NBD_OPT_LIST);
238     if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
239         LOG("write failed (opt)");
240         return -EINVAL;
241     }
242     type = cpu_to_be32(NBD_REP_SERVER);
243     if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
244         LOG("write failed (reply type)");
245         return -EINVAL;
246     }
247     len = cpu_to_be32(name_len + sizeof(len));
248     if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
249         LOG("write failed (length)");
250         return -EINVAL;
251     }
252     len = cpu_to_be32(name_len);
253     if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
254         LOG("write failed (length)");
255         return -EINVAL;
256     }
257     if (nbd_negotiate_write(ioc, exp->name, name_len) != name_len) {
258         LOG("write failed (buffer)");
259         return -EINVAL;
260     }
261     return 0;
262 }
263 
264 static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
265 {
266     NBDExport *exp;
267 
268     if (length) {
269         if (nbd_negotiate_drop_sync(client->ioc, length) != length) {
270             return -EIO;
271         }
272         return nbd_negotiate_send_rep(client->ioc,
273                                       NBD_REP_ERR_INVALID, NBD_OPT_LIST);
274     }
275 
276     /* For each export, send a NBD_REP_SERVER reply. */
277     QTAILQ_FOREACH(exp, &exports, next) {
278         if (nbd_negotiate_send_rep_list(client->ioc, exp)) {
279             return -EINVAL;
280         }
281     }
282     /* Finish with a NBD_REP_ACK. */
283     return nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK, NBD_OPT_LIST);
284 }
285 
286 static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
287 {
288     int rc = -EINVAL;
289     char name[NBD_MAX_NAME_SIZE + 1];
290 
291     /* Client sends:
292         [20 ..  xx]   export name (length bytes)
293      */
294     TRACE("Checking length");
295     if (length >= sizeof(name)) {
296         LOG("Bad length received");
297         goto fail;
298     }
299     if (nbd_negotiate_read(client->ioc, name, length) != length) {
300         LOG("read failed");
301         goto fail;
302     }
303     name[length] = '\0';
304 
305     TRACE("Client requested export '%s'", name);
306 
307     client->exp = nbd_export_find(name);
308     if (!client->exp) {
309         LOG("export not found");
310         goto fail;
311     }
312 
313     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
314     nbd_export_get(client->exp);
315     rc = 0;
316 fail:
317     return rc;
318 }
319 
320 
321 static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
322                                                  uint32_t length)
323 {
324     QIOChannel *ioc;
325     QIOChannelTLS *tioc;
326     struct NBDTLSHandshakeData data = { 0 };
327 
328     TRACE("Setting up TLS");
329     ioc = client->ioc;
330     if (length) {
331         if (nbd_negotiate_drop_sync(ioc, length) != length) {
332             return NULL;
333         }
334         nbd_negotiate_send_rep(ioc, NBD_REP_ERR_INVALID, NBD_OPT_STARTTLS);
335         return NULL;
336     }
337 
338     if (nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK,
339                                NBD_OPT_STARTTLS) < 0) {
340         return NULL;
341     }
342 
343     tioc = qio_channel_tls_new_server(ioc,
344                                       client->tlscreds,
345                                       client->tlsaclname,
346                                       NULL);
347     if (!tioc) {
348         return NULL;
349     }
350 
351     TRACE("Starting TLS handshake");
352     data.loop = g_main_loop_new(g_main_context_default(), FALSE);
353     qio_channel_tls_handshake(tioc,
354                               nbd_tls_handshake,
355                               &data,
356                               NULL);
357 
358     if (!data.complete) {
359         g_main_loop_run(data.loop);
360     }
361     g_main_loop_unref(data.loop);
362     if (data.error) {
363         object_unref(OBJECT(tioc));
364         error_free(data.error);
365         return NULL;
366     }
367 
368     return QIO_CHANNEL(tioc);
369 }
370 
371 
372 static int nbd_negotiate_options(NBDClient *client)
373 {
374     uint32_t flags;
375     bool fixedNewstyle = false;
376 
377     /* Client sends:
378         [ 0 ..   3]   client flags
379 
380         [ 0 ..   7]   NBD_OPTS_MAGIC
381         [ 8 ..  11]   NBD option
382         [12 ..  15]   Data length
383         ...           Rest of request
384 
385         [ 0 ..   7]   NBD_OPTS_MAGIC
386         [ 8 ..  11]   Second NBD option
387         [12 ..  15]   Data length
388         ...           Rest of request
389     */
390 
391     if (nbd_negotiate_read(client->ioc, &flags, sizeof(flags)) !=
392         sizeof(flags)) {
393         LOG("read failed");
394         return -EIO;
395     }
396     TRACE("Checking client flags");
397     be32_to_cpus(&flags);
398     if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
399         TRACE("Client supports fixed newstyle handshake");
400         fixedNewstyle = true;
401         flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
402     }
403     if (flags != 0) {
404         TRACE("Unknown client flags 0x%" PRIx32 " received", flags);
405         return -EIO;
406     }
407 
408     while (1) {
409         int ret;
410         uint32_t clientflags, length;
411         uint64_t magic;
412 
413         if (nbd_negotiate_read(client->ioc, &magic, sizeof(magic)) !=
414             sizeof(magic)) {
415             LOG("read failed");
416             return -EINVAL;
417         }
418         TRACE("Checking opts magic");
419         if (magic != be64_to_cpu(NBD_OPTS_MAGIC)) {
420             LOG("Bad magic received");
421             return -EINVAL;
422         }
423 
424         if (nbd_negotiate_read(client->ioc, &clientflags,
425                                sizeof(clientflags)) != sizeof(clientflags)) {
426             LOG("read failed");
427             return -EINVAL;
428         }
429         clientflags = be32_to_cpu(clientflags);
430 
431         if (nbd_negotiate_read(client->ioc, &length, sizeof(length)) !=
432             sizeof(length)) {
433             LOG("read failed");
434             return -EINVAL;
435         }
436         length = be32_to_cpu(length);
437 
438         TRACE("Checking option 0x%" PRIx32, clientflags);
439         if (client->tlscreds &&
440             client->ioc == (QIOChannel *)client->sioc) {
441             QIOChannel *tioc;
442             if (!fixedNewstyle) {
443                 TRACE("Unsupported option 0x%" PRIx32, clientflags);
444                 return -EINVAL;
445             }
446             switch (clientflags) {
447             case NBD_OPT_STARTTLS:
448                 tioc = nbd_negotiate_handle_starttls(client, length);
449                 if (!tioc) {
450                     return -EIO;
451                 }
452                 object_unref(OBJECT(client->ioc));
453                 client->ioc = QIO_CHANNEL(tioc);
454                 break;
455 
456             case NBD_OPT_EXPORT_NAME:
457                 /* No way to return an error to client, so drop connection */
458                 TRACE("Option 0x%x not permitted before TLS", clientflags);
459                 return -EINVAL;
460 
461             default:
462                 TRACE("Option 0x%" PRIx32 " not permitted before TLS",
463                       clientflags);
464                 if (nbd_negotiate_drop_sync(client->ioc, length) != length) {
465                     return -EIO;
466                 }
467                 ret = nbd_negotiate_send_rep(client->ioc, NBD_REP_ERR_TLS_REQD,
468                                              clientflags);
469                 if (ret < 0) {
470                     return ret;
471                 }
472                 break;
473             }
474         } else if (fixedNewstyle) {
475             switch (clientflags) {
476             case NBD_OPT_LIST:
477                 ret = nbd_negotiate_handle_list(client, length);
478                 if (ret < 0) {
479                     return ret;
480                 }
481                 break;
482 
483             case NBD_OPT_ABORT:
484                 return -EINVAL;
485 
486             case NBD_OPT_EXPORT_NAME:
487                 return nbd_negotiate_handle_export_name(client, length);
488 
489             case NBD_OPT_STARTTLS:
490                 if (nbd_negotiate_drop_sync(client->ioc, length) != length) {
491                     return -EIO;
492                 }
493                 if (client->tlscreds) {
494                     TRACE("TLS already enabled");
495                     ret = nbd_negotiate_send_rep(client->ioc,
496                                                  NBD_REP_ERR_INVALID,
497                                                  clientflags);
498                 } else {
499                     TRACE("TLS not configured");
500                     ret = nbd_negotiate_send_rep(client->ioc,
501                                                  NBD_REP_ERR_POLICY,
502                                                  clientflags);
503                 }
504                 if (ret < 0) {
505                     return ret;
506                 }
507                 break;
508             default:
509                 TRACE("Unsupported option 0x%" PRIx32, clientflags);
510                 if (nbd_negotiate_drop_sync(client->ioc, length) != length) {
511                     return -EIO;
512                 }
513                 ret = nbd_negotiate_send_rep(client->ioc, NBD_REP_ERR_UNSUP,
514                                              clientflags);
515                 if (ret < 0) {
516                     return ret;
517                 }
518                 break;
519             }
520         } else {
521             /*
522              * If broken new-style we should drop the connection
523              * for anything except NBD_OPT_EXPORT_NAME
524              */
525             switch (clientflags) {
526             case NBD_OPT_EXPORT_NAME:
527                 return nbd_negotiate_handle_export_name(client, length);
528 
529             default:
530                 TRACE("Unsupported option 0x%" PRIx32, clientflags);
531                 return -EINVAL;
532             }
533         }
534     }
535 }
536 
537 typedef struct {
538     NBDClient *client;
539     Coroutine *co;
540 } NBDClientNewData;
541 
542 static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
543 {
544     NBDClient *client = data->client;
545     char buf[8 + 8 + 8 + 128];
546     int rc;
547     const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
548                          NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA);
549     bool oldStyle;
550 
551     /* Old style negotiation header without options
552         [ 0 ..   7]   passwd       ("NBDMAGIC")
553         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
554         [16 ..  23]   size
555         [24 ..  25]   server flags (0)
556         [26 ..  27]   export flags
557         [28 .. 151]   reserved     (0)
558 
559        New style negotiation header with options
560         [ 0 ..   7]   passwd       ("NBDMAGIC")
561         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
562         [16 ..  17]   server flags (0)
563         ....options sent....
564         [18 ..  25]   size
565         [26 ..  27]   export flags
566         [28 .. 151]   reserved     (0)
567      */
568 
569     qio_channel_set_blocking(client->ioc, false, NULL);
570     rc = -EINVAL;
571 
572     TRACE("Beginning negotiation.");
573     memset(buf, 0, sizeof(buf));
574     memcpy(buf, "NBDMAGIC", 8);
575 
576     oldStyle = client->exp != NULL && !client->tlscreds;
577     if (oldStyle) {
578         assert ((client->exp->nbdflags & ~65535) == 0);
579         TRACE("advertising size %" PRIu64 " and flags %x",
580               client->exp->size, client->exp->nbdflags | myflags);
581         stq_be_p(buf + 8, NBD_CLIENT_MAGIC);
582         stq_be_p(buf + 16, client->exp->size);
583         stw_be_p(buf + 26, client->exp->nbdflags | myflags);
584     } else {
585         stq_be_p(buf + 8, NBD_OPTS_MAGIC);
586         stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE);
587     }
588 
589     if (oldStyle) {
590         if (client->tlscreds) {
591             TRACE("TLS cannot be enabled with oldstyle protocol");
592             goto fail;
593         }
594         if (nbd_negotiate_write(client->ioc, buf, sizeof(buf)) != sizeof(buf)) {
595             LOG("write failed");
596             goto fail;
597         }
598     } else {
599         if (nbd_negotiate_write(client->ioc, buf, 18) != 18) {
600             LOG("write failed");
601             goto fail;
602         }
603         rc = nbd_negotiate_options(client);
604         if (rc != 0) {
605             LOG("option negotiation failed");
606             goto fail;
607         }
608 
609         assert ((client->exp->nbdflags & ~65535) == 0);
610         TRACE("advertising size %" PRIu64 " and flags %x",
611               client->exp->size, client->exp->nbdflags | myflags);
612         stq_be_p(buf + 18, client->exp->size);
613         stw_be_p(buf + 26, client->exp->nbdflags | myflags);
614         if (nbd_negotiate_write(client->ioc, buf + 18, sizeof(buf) - 18) !=
615             sizeof(buf) - 18) {
616             LOG("write failed");
617             goto fail;
618         }
619     }
620 
621     TRACE("Negotiation succeeded.");
622     rc = 0;
623 fail:
624     return rc;
625 }
626 
627 static ssize_t nbd_receive_request(QIOChannel *ioc, struct nbd_request *request)
628 {
629     uint8_t buf[NBD_REQUEST_SIZE];
630     uint32_t magic;
631     ssize_t ret;
632 
633     ret = read_sync(ioc, buf, sizeof(buf));
634     if (ret < 0) {
635         return ret;
636     }
637 
638     if (ret != sizeof(buf)) {
639         LOG("read failed");
640         return -EINVAL;
641     }
642 
643     /* Request
644        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
645        [ 4 ..  7]   type    (0 == READ, 1 == WRITE)
646        [ 8 .. 15]   handle
647        [16 .. 23]   from
648        [24 .. 27]   len
649      */
650 
651     magic = ldl_be_p(buf);
652     request->type   = ldl_be_p(buf + 4);
653     request->handle = ldq_be_p(buf + 8);
654     request->from   = ldq_be_p(buf + 16);
655     request->len    = ldl_be_p(buf + 24);
656 
657     TRACE("Got request: { magic = 0x%" PRIx32 ", .type = %" PRIx32
658           ", from = %" PRIu64 " , len = %" PRIu32 " }",
659           magic, request->type, request->from, request->len);
660 
661     if (magic != NBD_REQUEST_MAGIC) {
662         LOG("invalid magic (got 0x%" PRIx32 ")", magic);
663         return -EINVAL;
664     }
665     return 0;
666 }
667 
668 static ssize_t nbd_send_reply(QIOChannel *ioc, struct nbd_reply *reply)
669 {
670     uint8_t buf[NBD_REPLY_SIZE];
671     ssize_t ret;
672 
673     reply->error = system_errno_to_nbd_errno(reply->error);
674 
675     TRACE("Sending response to client: { .error = %" PRId32
676           ", handle = %" PRIu64 " }",
677           reply->error, reply->handle);
678 
679     /* Reply
680        [ 0 ..  3]    magic   (NBD_REPLY_MAGIC)
681        [ 4 ..  7]    error   (0 == no error)
682        [ 7 .. 15]    handle
683      */
684     stl_be_p(buf, NBD_REPLY_MAGIC);
685     stl_be_p(buf + 4, reply->error);
686     stq_be_p(buf + 8, reply->handle);
687 
688     ret = write_sync(ioc, buf, sizeof(buf));
689     if (ret < 0) {
690         return ret;
691     }
692 
693     if (ret != sizeof(buf)) {
694         LOG("writing to socket failed");
695         return -EINVAL;
696     }
697     return 0;
698 }
699 
700 #define MAX_NBD_REQUESTS 16
701 
702 void nbd_client_get(NBDClient *client)
703 {
704     client->refcount++;
705 }
706 
707 void nbd_client_put(NBDClient *client)
708 {
709     if (--client->refcount == 0) {
710         /* The last reference should be dropped by client->close,
711          * which is called by client_close.
712          */
713         assert(client->closing);
714 
715         nbd_unset_handlers(client);
716         object_unref(OBJECT(client->sioc));
717         object_unref(OBJECT(client->ioc));
718         if (client->tlscreds) {
719             object_unref(OBJECT(client->tlscreds));
720         }
721         g_free(client->tlsaclname);
722         if (client->exp) {
723             QTAILQ_REMOVE(&client->exp->clients, client, next);
724             nbd_export_put(client->exp);
725         }
726         g_free(client);
727     }
728 }
729 
730 static void client_close(NBDClient *client)
731 {
732     if (client->closing) {
733         return;
734     }
735 
736     client->closing = true;
737 
738     /* Force requests to finish.  They will drop their own references,
739      * then we'll close the socket and free the NBDClient.
740      */
741     qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
742                          NULL);
743 
744     /* Also tell the client, so that they release their reference.  */
745     if (client->close) {
746         client->close(client);
747     }
748 }
749 
750 static NBDRequest *nbd_request_get(NBDClient *client)
751 {
752     NBDRequest *req;
753 
754     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
755     client->nb_requests++;
756     nbd_update_can_read(client);
757 
758     req = g_new0(NBDRequest, 1);
759     nbd_client_get(client);
760     req->client = client;
761     return req;
762 }
763 
764 static void nbd_request_put(NBDRequest *req)
765 {
766     NBDClient *client = req->client;
767 
768     if (req->data) {
769         qemu_vfree(req->data);
770     }
771     g_free(req);
772 
773     client->nb_requests--;
774     nbd_update_can_read(client);
775     nbd_client_put(client);
776 }
777 
778 static void blk_aio_attached(AioContext *ctx, void *opaque)
779 {
780     NBDExport *exp = opaque;
781     NBDClient *client;
782 
783     TRACE("Export %s: Attaching clients to AIO context %p\n", exp->name, ctx);
784 
785     exp->ctx = ctx;
786 
787     QTAILQ_FOREACH(client, &exp->clients, next) {
788         nbd_set_handlers(client);
789     }
790 }
791 
792 static void blk_aio_detach(void *opaque)
793 {
794     NBDExport *exp = opaque;
795     NBDClient *client;
796 
797     TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
798 
799     QTAILQ_FOREACH(client, &exp->clients, next) {
800         nbd_unset_handlers(client);
801     }
802 
803     exp->ctx = NULL;
804 }
805 
806 static void nbd_eject_notifier(Notifier *n, void *data)
807 {
808     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
809     nbd_export_close(exp);
810 }
811 
812 NBDExport *nbd_export_new(BlockBackend *blk, off_t dev_offset, off_t size,
813                           uint32_t nbdflags, void (*close)(NBDExport *),
814                           Error **errp)
815 {
816     NBDExport *exp = g_malloc0(sizeof(NBDExport));
817     exp->refcount = 1;
818     QTAILQ_INIT(&exp->clients);
819     exp->blk = blk;
820     exp->dev_offset = dev_offset;
821     exp->nbdflags = nbdflags;
822     exp->size = size < 0 ? blk_getlength(blk) : size;
823     if (exp->size < 0) {
824         error_setg_errno(errp, -exp->size,
825                          "Failed to determine the NBD export's length");
826         goto fail;
827     }
828     exp->size -= exp->size % BDRV_SECTOR_SIZE;
829 
830     exp->close = close;
831     exp->ctx = blk_get_aio_context(blk);
832     blk_ref(blk);
833     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
834 
835     exp->eject_notifier.notify = nbd_eject_notifier;
836     blk_add_remove_bs_notifier(blk, &exp->eject_notifier);
837 
838     /*
839      * NBD exports are used for non-shared storage migration.  Make sure
840      * that BDRV_O_INACTIVE is cleared and the image is ready for write
841      * access since the export could be available before migration handover.
842      */
843     aio_context_acquire(exp->ctx);
844     blk_invalidate_cache(blk, NULL);
845     aio_context_release(exp->ctx);
846     return exp;
847 
848 fail:
849     g_free(exp);
850     return NULL;
851 }
852 
853 NBDExport *nbd_export_find(const char *name)
854 {
855     NBDExport *exp;
856     QTAILQ_FOREACH(exp, &exports, next) {
857         if (strcmp(name, exp->name) == 0) {
858             return exp;
859         }
860     }
861 
862     return NULL;
863 }
864 
865 void nbd_export_set_name(NBDExport *exp, const char *name)
866 {
867     if (exp->name == name) {
868         return;
869     }
870 
871     nbd_export_get(exp);
872     if (exp->name != NULL) {
873         g_free(exp->name);
874         exp->name = NULL;
875         QTAILQ_REMOVE(&exports, exp, next);
876         nbd_export_put(exp);
877     }
878     if (name != NULL) {
879         nbd_export_get(exp);
880         exp->name = g_strdup(name);
881         QTAILQ_INSERT_TAIL(&exports, exp, next);
882     }
883     nbd_export_put(exp);
884 }
885 
886 void nbd_export_close(NBDExport *exp)
887 {
888     NBDClient *client, *next;
889 
890     nbd_export_get(exp);
891     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
892         client_close(client);
893     }
894     nbd_export_set_name(exp, NULL);
895     nbd_export_put(exp);
896 }
897 
898 void nbd_export_get(NBDExport *exp)
899 {
900     assert(exp->refcount > 0);
901     exp->refcount++;
902 }
903 
904 void nbd_export_put(NBDExport *exp)
905 {
906     assert(exp->refcount > 0);
907     if (exp->refcount == 1) {
908         nbd_export_close(exp);
909     }
910 
911     if (--exp->refcount == 0) {
912         assert(exp->name == NULL);
913 
914         if (exp->close) {
915             exp->close(exp);
916         }
917 
918         if (exp->blk) {
919             notifier_remove(&exp->eject_notifier);
920             blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
921                                             blk_aio_detach, exp);
922             blk_unref(exp->blk);
923             exp->blk = NULL;
924         }
925 
926         g_free(exp);
927     }
928 }
929 
930 BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
931 {
932     return exp->blk;
933 }
934 
935 void nbd_export_close_all(void)
936 {
937     NBDExport *exp, *next;
938 
939     QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
940         nbd_export_close(exp);
941     }
942 }
943 
944 static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
945                                  int len)
946 {
947     NBDClient *client = req->client;
948     ssize_t rc, ret;
949 
950     g_assert(qemu_in_coroutine());
951     qemu_co_mutex_lock(&client->send_lock);
952     client->send_coroutine = qemu_coroutine_self();
953     nbd_set_handlers(client);
954 
955     if (!len) {
956         rc = nbd_send_reply(client->ioc, reply);
957     } else {
958         qio_channel_set_cork(client->ioc, true);
959         rc = nbd_send_reply(client->ioc, reply);
960         if (rc >= 0) {
961             ret = write_sync(client->ioc, req->data, len);
962             if (ret != len) {
963                 rc = -EIO;
964             }
965         }
966         qio_channel_set_cork(client->ioc, false);
967     }
968 
969     client->send_coroutine = NULL;
970     nbd_set_handlers(client);
971     qemu_co_mutex_unlock(&client->send_lock);
972     return rc;
973 }
974 
975 /* Collect a client request.  Return 0 if request looks valid, -EAGAIN
976  * to keep trying the collection, -EIO to drop connection right away,
977  * and any other negative value to report an error to the client
978  * (although the caller may still need to disconnect after reporting
979  * the error).  */
980 static ssize_t nbd_co_receive_request(NBDRequest *req,
981                                       struct nbd_request *request)
982 {
983     NBDClient *client = req->client;
984     uint32_t command;
985     ssize_t rc;
986 
987     g_assert(qemu_in_coroutine());
988     client->recv_coroutine = qemu_coroutine_self();
989     nbd_update_can_read(client);
990 
991     rc = nbd_receive_request(client->ioc, request);
992     if (rc < 0) {
993         if (rc != -EAGAIN) {
994             rc = -EIO;
995         }
996         goto out;
997     }
998 
999     TRACE("Decoding type");
1000 
1001     command = request->type & NBD_CMD_MASK_COMMAND;
1002     if (command != NBD_CMD_WRITE) {
1003         /* No payload, we are ready to read the next request.  */
1004         req->complete = true;
1005     }
1006 
1007     if (command == NBD_CMD_DISC) {
1008         /* Special case: we're going to disconnect without a reply,
1009          * whether or not flags, from, or len are bogus */
1010         TRACE("Request type is DISCONNECT");
1011         rc = -EIO;
1012         goto out;
1013     }
1014 
1015     /* Check for sanity in the parameters, part 1.  Defer as many
1016      * checks as possible until after reading any NBD_CMD_WRITE
1017      * payload, so we can try and keep the connection alive.  */
1018     if ((request->from + request->len) < request->from) {
1019         LOG("integer overflow detected, you're probably being attacked");
1020         rc = -EINVAL;
1021         goto out;
1022     }
1023 
1024     if (command == NBD_CMD_READ || command == NBD_CMD_WRITE) {
1025         if (request->len > NBD_MAX_BUFFER_SIZE) {
1026             LOG("len (%" PRIu32" ) is larger than max len (%u)",
1027                 request->len, NBD_MAX_BUFFER_SIZE);
1028             rc = -EINVAL;
1029             goto out;
1030         }
1031 
1032         req->data = blk_try_blockalign(client->exp->blk, request->len);
1033         if (req->data == NULL) {
1034             rc = -ENOMEM;
1035             goto out;
1036         }
1037     }
1038     if (command == NBD_CMD_WRITE) {
1039         TRACE("Reading %" PRIu32 " byte(s)", request->len);
1040 
1041         if (read_sync(client->ioc, req->data, request->len) != request->len) {
1042             LOG("reading from socket failed");
1043             rc = -EIO;
1044             goto out;
1045         }
1046         req->complete = true;
1047     }
1048 
1049     /* Sanity checks, part 2. */
1050     if (request->from + request->len > client->exp->size) {
1051         LOG("operation past EOF; From: %" PRIu64 ", Len: %" PRIu32
1052             ", Size: %" PRIu64, request->from, request->len,
1053             (uint64_t)client->exp->size);
1054         rc = command == NBD_CMD_WRITE ? -ENOSPC : -EINVAL;
1055         goto out;
1056     }
1057     if (request->type & ~NBD_CMD_MASK_COMMAND & ~NBD_CMD_FLAG_FUA) {
1058         LOG("unsupported flags (got 0x%x)",
1059             request->type & ~NBD_CMD_MASK_COMMAND);
1060         return -EINVAL;
1061     }
1062 
1063     rc = 0;
1064 
1065 out:
1066     client->recv_coroutine = NULL;
1067     nbd_update_can_read(client);
1068 
1069     return rc;
1070 }
1071 
1072 static void nbd_trip(void *opaque)
1073 {
1074     NBDClient *client = opaque;
1075     NBDExport *exp = client->exp;
1076     NBDRequest *req;
1077     struct nbd_request request;
1078     struct nbd_reply reply;
1079     ssize_t ret;
1080     uint32_t command;
1081     int flags;
1082 
1083     TRACE("Reading request.");
1084     if (client->closing) {
1085         return;
1086     }
1087 
1088     req = nbd_request_get(client);
1089     ret = nbd_co_receive_request(req, &request);
1090     if (ret == -EAGAIN) {
1091         goto done;
1092     }
1093     if (ret == -EIO) {
1094         goto out;
1095     }
1096 
1097     reply.handle = request.handle;
1098     reply.error = 0;
1099 
1100     if (ret < 0) {
1101         reply.error = -ret;
1102         goto error_reply;
1103     }
1104     command = request.type & NBD_CMD_MASK_COMMAND;
1105 
1106     if (client->closing) {
1107         /*
1108          * The client may be closed when we are blocked in
1109          * nbd_co_receive_request()
1110          */
1111         goto done;
1112     }
1113 
1114     switch (command) {
1115     case NBD_CMD_READ:
1116         TRACE("Request type is READ");
1117 
1118         if (request.type & NBD_CMD_FLAG_FUA) {
1119             ret = blk_co_flush(exp->blk);
1120             if (ret < 0) {
1121                 LOG("flush failed");
1122                 reply.error = -ret;
1123                 goto error_reply;
1124             }
1125         }
1126 
1127         ret = blk_pread(exp->blk, request.from + exp->dev_offset,
1128                         req->data, request.len);
1129         if (ret < 0) {
1130             LOG("reading from file failed");
1131             reply.error = -ret;
1132             goto error_reply;
1133         }
1134 
1135         TRACE("Read %" PRIu32" byte(s)", request.len);
1136         if (nbd_co_send_reply(req, &reply, request.len) < 0)
1137             goto out;
1138         break;
1139     case NBD_CMD_WRITE:
1140         TRACE("Request type is WRITE");
1141 
1142         if (exp->nbdflags & NBD_FLAG_READ_ONLY) {
1143             TRACE("Server is read-only, return error");
1144             reply.error = EROFS;
1145             goto error_reply;
1146         }
1147 
1148         TRACE("Writing to device");
1149 
1150         flags = 0;
1151         if (request.type & NBD_CMD_FLAG_FUA) {
1152             flags |= BDRV_REQ_FUA;
1153         }
1154         ret = blk_pwrite(exp->blk, request.from + exp->dev_offset,
1155                          req->data, request.len, flags);
1156         if (ret < 0) {
1157             LOG("writing to file failed");
1158             reply.error = -ret;
1159             goto error_reply;
1160         }
1161 
1162         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1163             goto out;
1164         }
1165         break;
1166 
1167     case NBD_CMD_DISC:
1168         /* unreachable, thanks to special case in nbd_co_receive_request() */
1169         abort();
1170 
1171     case NBD_CMD_FLUSH:
1172         TRACE("Request type is FLUSH");
1173 
1174         ret = blk_co_flush(exp->blk);
1175         if (ret < 0) {
1176             LOG("flush failed");
1177             reply.error = -ret;
1178         }
1179         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1180             goto out;
1181         }
1182         break;
1183     case NBD_CMD_TRIM:
1184         TRACE("Request type is TRIM");
1185         ret = blk_co_pdiscard(exp->blk, request.from + exp->dev_offset,
1186                               request.len);
1187         if (ret < 0) {
1188             LOG("discard failed");
1189             reply.error = -ret;
1190         }
1191         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1192             goto out;
1193         }
1194         break;
1195     default:
1196         LOG("invalid request type (%" PRIu32 ") received", request.type);
1197         reply.error = EINVAL;
1198     error_reply:
1199         /* We must disconnect after NBD_CMD_WRITE if we did not
1200          * read the payload.
1201          */
1202         if (nbd_co_send_reply(req, &reply, 0) < 0 || !req->complete) {
1203             goto out;
1204         }
1205         break;
1206     }
1207 
1208     TRACE("Request/Reply complete");
1209 
1210 done:
1211     nbd_request_put(req);
1212     return;
1213 
1214 out:
1215     nbd_request_put(req);
1216     client_close(client);
1217 }
1218 
1219 static void nbd_read(void *opaque)
1220 {
1221     NBDClient *client = opaque;
1222 
1223     if (client->recv_coroutine) {
1224         qemu_coroutine_enter(client->recv_coroutine);
1225     } else {
1226         qemu_coroutine_enter(qemu_coroutine_create(nbd_trip, client));
1227     }
1228 }
1229 
1230 static void nbd_restart_write(void *opaque)
1231 {
1232     NBDClient *client = opaque;
1233 
1234     qemu_coroutine_enter(client->send_coroutine);
1235 }
1236 
1237 static void nbd_set_handlers(NBDClient *client)
1238 {
1239     if (client->exp && client->exp->ctx) {
1240         aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
1241                            true,
1242                            client->can_read ? nbd_read : NULL,
1243                            client->send_coroutine ? nbd_restart_write : NULL,
1244                            client);
1245     }
1246 }
1247 
1248 static void nbd_unset_handlers(NBDClient *client)
1249 {
1250     if (client->exp && client->exp->ctx) {
1251         aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
1252                            true, NULL, NULL, NULL);
1253     }
1254 }
1255 
1256 static void nbd_update_can_read(NBDClient *client)
1257 {
1258     bool can_read = client->recv_coroutine ||
1259                     client->nb_requests < MAX_NBD_REQUESTS;
1260 
1261     if (can_read != client->can_read) {
1262         client->can_read = can_read;
1263         nbd_set_handlers(client);
1264 
1265         /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
1266          * in nbd_set_handlers() will have taken care of that */
1267     }
1268 }
1269 
1270 static coroutine_fn void nbd_co_client_start(void *opaque)
1271 {
1272     NBDClientNewData *data = opaque;
1273     NBDClient *client = data->client;
1274     NBDExport *exp = client->exp;
1275 
1276     if (exp) {
1277         nbd_export_get(exp);
1278     }
1279     if (nbd_negotiate(data)) {
1280         client_close(client);
1281         goto out;
1282     }
1283     qemu_co_mutex_init(&client->send_lock);
1284     nbd_set_handlers(client);
1285 
1286     if (exp) {
1287         QTAILQ_INSERT_TAIL(&exp->clients, client, next);
1288     }
1289 out:
1290     g_free(data);
1291 }
1292 
1293 void nbd_client_new(NBDExport *exp,
1294                     QIOChannelSocket *sioc,
1295                     QCryptoTLSCreds *tlscreds,
1296                     const char *tlsaclname,
1297                     void (*close_fn)(NBDClient *))
1298 {
1299     NBDClient *client;
1300     NBDClientNewData *data = g_new(NBDClientNewData, 1);
1301 
1302     client = g_malloc0(sizeof(NBDClient));
1303     client->refcount = 1;
1304     client->exp = exp;
1305     client->tlscreds = tlscreds;
1306     if (tlscreds) {
1307         object_ref(OBJECT(client->tlscreds));
1308     }
1309     client->tlsaclname = g_strdup(tlsaclname);
1310     client->sioc = sioc;
1311     object_ref(OBJECT(client->sioc));
1312     client->ioc = QIO_CHANNEL(sioc);
1313     object_ref(OBJECT(client->ioc));
1314     client->can_read = true;
1315     client->close = close_fn;
1316 
1317     data->client = client;
1318     data->co = qemu_coroutine_create(nbd_co_client_start, data);
1319     qemu_coroutine_enter(data->co);
1320 }
1321