1 /*
2  * virnetclient.c: generic network RPC client
3  *
4  * Copyright (C) 2006-2014 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library.  If not, see
18  * <http://www.gnu.org/licenses/>.
19  */
20 
21 #include <config.h>
22 
23 #include <unistd.h>
24 #include <signal.h>
25 #include <fcntl.h>
26 
27 #include "virnetclient.h"
28 #include "virnetsocket.h"
29 #include "virkeepalive.h"
30 #include "viralloc.h"
31 #include "virthread.h"
32 #include "virfile.h"
33 #include "virlog.h"
34 #include "virutil.h"
35 #include "virerror.h"
36 #include "virprobe.h"
37 #include "virstring.h"
38 #include "vireventglibwatch.h"
39 
40 #define VIR_FROM_THIS VIR_FROM_RPC
41 
42 VIR_LOG_INIT("rpc.netclient");
43 
44 typedef struct _virNetClientCall virNetClientCall;
45 
46 enum {
47     VIR_NET_CLIENT_MODE_WAIT_TX,
48     VIR_NET_CLIENT_MODE_WAIT_RX,
49     VIR_NET_CLIENT_MODE_COMPLETE,
50 };
51 
52 VIR_ENUM_IMPL(virNetClientProxy,
53               VIR_NET_CLIENT_PROXY_LAST,
54               "auto", "netcat", "native");
55 
56 struct _virNetClientCall {
57     int mode;
58 
59     virNetMessage *msg;
60     bool expectReply;
61     bool nonBlock;
62     bool haveThread;
63 
64     virCond cond;
65 
66     virNetClientCall *next;
67 };
68 
69 
70 struct _virNetClient {
71     virObjectLockable parent;
72 
73     virNetSocket *sock;
74     bool asyncIO;
75 
76     virNetTLSSession *tls;
77     char *hostname;
78 
79     virNetClientProgram **programs;
80     size_t nprograms;
81 
82     /* For incoming message packets */
83     virNetMessage msg;
84 
85 #if WITH_SASL
86     virNetSASLSession *sasl;
87 #endif
88 
89     GMainLoop *eventLoop;
90     GMainContext *eventCtx;
91 
92     /*
93      * List of calls currently waiting for dispatch
94      * The calls should all have threads waiting for
95      * them, except possibly the first call in the list
96      * which might be a partially sent non-blocking call.
97      */
98     virNetClientCall *waitDispatch;
99     /* True if a thread holds the buck */
100     bool haveTheBuck;
101 
102     size_t nstreams;
103     virNetClientStream **streams;
104 
105     virKeepAlive *keepalive;
106     bool wantClose;
107     int closeReason;
108     virErrorPtr error;
109 
110     virNetClientCloseFunc closeCb;
111     void *closeOpaque;
112     virFreeCallback closeFf;
113 };
114 
115 
116 static virClass *virNetClientClass;
117 static void virNetClientDispose(void *obj);
118 
virNetClientOnceInit(void)119 static int virNetClientOnceInit(void)
120 {
121     if (!VIR_CLASS_NEW(virNetClient, virClassForObjectLockable()))
122         return -1;
123 
124     return 0;
125 }
126 
127 VIR_ONCE_GLOBAL_INIT(virNetClient);
128 
129 static void virNetClientIOEventLoopPassTheBuck(virNetClient *client,
130                                                virNetClientCall *thiscall);
131 static int virNetClientQueueNonBlocking(virNetClient *client,
132                                         virNetMessage *msg);
133 static void virNetClientCloseInternal(virNetClient *client,
134                                       int reason);
135 
136 
virNetClientSetCloseCallback(virNetClient * client,virNetClientCloseFunc cb,void * opaque,virFreeCallback ff)137 void virNetClientSetCloseCallback(virNetClient *client,
138                                   virNetClientCloseFunc cb,
139                                   void *opaque,
140                                   virFreeCallback ff)
141 {
142     virObjectLock(client);
143     client->closeCb = cb;
144     client->closeOpaque = opaque;
145     client->closeFf = ff;
146     virObjectUnlock(client);
147 }
148 
149 
150 static void virNetClientIncomingEvent(virNetSocket *sock,
151                                       int events,
152                                       void *opaque);
153 
154 /* Append a call to the end of the list */
virNetClientCallQueue(virNetClientCall ** head,virNetClientCall * call)155 static void virNetClientCallQueue(virNetClientCall **head,
156                                   virNetClientCall *call)
157 {
158     virNetClientCall *tmp = *head;
159     while (tmp && tmp->next)
160         tmp = tmp->next;
161     if (tmp)
162         tmp->next = call;
163     else
164         *head = call;
165     call->next = NULL;
166 }
167 
168 #if 0
169 /* Obtain a call from the head of the list */
170 static virNetClientCall *virNetClientCallServe(virNetClientCall **head)
171 {
172     virNetClientCall *tmp = *head;
173     if (tmp)
174         *head = tmp->next;
175     else
176         *head = NULL;
177     tmp->next = NULL;
178     return tmp;
179 }
180 #endif
181 
182 /* Remove a call from anywhere in the list */
virNetClientCallRemove(virNetClientCall ** head,virNetClientCall * call)183 static void virNetClientCallRemove(virNetClientCall **head,
184                                    virNetClientCall *call)
185 {
186     virNetClientCall *tmp = *head;
187     virNetClientCall *prev = NULL;
188     while (tmp) {
189         if (tmp == call) {
190             if (prev)
191                 prev->next = g_steal_pointer(&tmp->next);
192             else
193                 *head = g_steal_pointer(&tmp->next);
194             return;
195         }
196         prev = tmp;
197         tmp = tmp->next;
198     }
199 }
200 
201 /* Predicate returns true if matches */
202 typedef bool (*virNetClientCallPredicate)(virNetClientCall *call, void *opaque);
203 
204 /* Remove a list of calls from the list based on a predicate */
virNetClientCallRemovePredicate(virNetClientCall ** head,virNetClientCallPredicate pred,void * opaque)205 static void virNetClientCallRemovePredicate(virNetClientCall **head,
206                                             virNetClientCallPredicate pred,
207                                             void *opaque)
208 {
209     virNetClientCall *tmp = *head;
210     virNetClientCall *prev = NULL;
211     while (tmp) {
212         virNetClientCall *next = tmp->next;
213         tmp->next = NULL; /* Temp unlink */
214         if (pred(tmp, opaque)) {
215             if (prev)
216                 prev->next = next;
217             else
218                 *head = next;
219         } else {
220             tmp->next = next; /* Reverse temp unlink */
221             prev = tmp;
222         }
223         tmp = next;
224     }
225 }
226 
227 /* Returns true if the predicate matches at least one call in the list */
virNetClientCallMatchPredicate(virNetClientCall * head,virNetClientCallPredicate pred,void * opaque)228 static bool virNetClientCallMatchPredicate(virNetClientCall *head,
229                                            virNetClientCallPredicate pred,
230                                            void *opaque)
231 {
232     virNetClientCall *tmp = head;
233     while (tmp) {
234         if (pred(tmp, opaque))
235             return true;
236         tmp = tmp->next;
237     }
238     return false;
239 }
240 
241 
242 bool
virNetClientKeepAliveIsSupported(virNetClient * client)243 virNetClientKeepAliveIsSupported(virNetClient *client)
244 {
245     bool supported;
246 
247     virObjectLock(client);
248     supported = !!client->keepalive;
249     virObjectUnlock(client);
250 
251     return supported;
252 }
253 
254 int
virNetClientKeepAliveStart(virNetClient * client,int interval,unsigned int count)255 virNetClientKeepAliveStart(virNetClient *client,
256                            int interval,
257                            unsigned int count)
258 {
259     int ret;
260 
261     virObjectLock(client);
262     ret = virKeepAliveStart(client->keepalive, interval, count);
263     virObjectUnlock(client);
264 
265     return ret;
266 }
267 
268 void
virNetClientKeepAliveStop(virNetClient * client)269 virNetClientKeepAliveStop(virNetClient *client)
270 {
271     virObjectLock(client);
272     virKeepAliveStop(client->keepalive);
273     virObjectUnlock(client);
274 }
275 
276 static void
virNetClientKeepAliveDeadCB(void * opaque)277 virNetClientKeepAliveDeadCB(void *opaque)
278 {
279     virNetClientCloseInternal(opaque, VIR_CONNECT_CLOSE_REASON_KEEPALIVE);
280 }
281 
282 static int
virNetClientKeepAliveSendCB(void * opaque,virNetMessage * msg)283 virNetClientKeepAliveSendCB(void *opaque,
284                             virNetMessage *msg)
285 {
286     int ret;
287 
288     ret = virNetClientSendNonBlock(opaque, msg);
289     if (ret != -1 && ret != 1)
290         virNetMessageFree(msg);
291     return ret;
292 }
293 
virNetClientNew(virNetSocket * sock,const char * hostname)294 static virNetClient *virNetClientNew(virNetSocket *sock,
295                                        const char *hostname)
296 {
297     virNetClient *client = NULL;
298 
299     if (virNetClientInitialize() < 0)
300         goto error;
301 
302     if (!(client = virObjectLockableNew(virNetClientClass)))
303         goto error;
304 
305     client->sock = g_steal_pointer(&sock);
306 
307     client->eventCtx = g_main_context_new();
308     client->eventLoop = g_main_loop_new(client->eventCtx, FALSE);
309 
310     client->hostname = g_strdup(hostname);
311 
312     PROBE(RPC_CLIENT_NEW,
313           "client=%p sock=%p",
314           client, client->sock);
315     return client;
316 
317  error:
318     virObjectUnref(client);
319     virObjectUnref(sock);
320     return NULL;
321 }
322 
323 /*
324  * Check whether the specified SSH key exists.
325  *
326  * Return -1 on error, 0 if it does not exist, and 1 if it does exist.
327  */
328 static int
virNetClientCheckKeyExists(const char * homedir,const char * name,char ** retPath)329 virNetClientCheckKeyExists(const char *homedir,
330                            const char *name,
331                            char **retPath)
332 {
333     char *path;
334 
335     path = g_strdup_printf("%s/.ssh/%s", homedir, name);
336 
337     if (!(virFileExists(path))) {
338         VIR_FREE(path);
339         return 0;
340     }
341 
342     *retPath = path;
343     return 1;
344 }
345 
346 /*
347  * Detect the default SSH key, if existing.
348  *
349  * Return -1 on error, 0 if it does not exist, and 1 if it does exist.
350  */
351 static int
virNetClientFindDefaultSshKey(const char * homedir,char ** retPath)352 virNetClientFindDefaultSshKey(const char *homedir, char **retPath)
353 {
354     size_t i;
355 
356     const char *keys[] = { "identity", "id_dsa", "id_ecdsa", "id_ed25519", "id_rsa" };
357 
358     for (i = 0; i < G_N_ELEMENTS(keys); ++i) {
359         int ret = virNetClientCheckKeyExists(homedir, keys[i], retPath);
360         if (ret != 0)
361             return ret;
362     }
363 
364     return 0;
365 }
366 
367 
virNetClientNewUNIX(const char * path,const char * spawnDaemonPath)368 virNetClient *virNetClientNewUNIX(const char *path,
369                                   const char *spawnDaemonPath)
370 {
371     virNetSocket *sock;
372 
373     if (virNetSocketNewConnectUNIX(path, spawnDaemonPath, &sock) < 0)
374         return NULL;
375 
376     return virNetClientNew(sock, NULL);
377 }
378 
379 
virNetClientNewTCP(const char * nodename,const char * service,int family)380 virNetClient *virNetClientNewTCP(const char *nodename,
381                                    const char *service,
382                                    int family)
383 {
384     virNetSocket *sock;
385 
386     if (virNetSocketNewConnectTCP(nodename, service,
387                                   family,
388                                   &sock) < 0)
389         return NULL;
390 
391     return virNetClientNew(sock, nodename);
392 }
393 
394 
395 /*
396  * The SSH Server uses shell to spawn the command we give
397  * it.  Our command then invokes shell again. Thus we need
398  * to apply two levels of escaping, so that commands with
399  * whitespace in their path get correctly interpreted.
400  */
401 static char *
virNetClientDoubleEscapeShell(const char * str)402 virNetClientDoubleEscapeShell(const char *str)
403 {
404     virBuffer buf = VIR_BUFFER_INITIALIZER;
405     g_autofree char *tmp = NULL;
406 
407     virBufferEscapeShell(&buf, str);
408 
409     tmp = virBufferContentAndReset(&buf);
410 
411     virBufferEscapeShell(&buf, tmp);
412 
413     return virBufferContentAndReset(&buf);
414 }
415 
416 char *
virNetClientSSHHelperCommand(virNetClientProxy proxy,const char * netcatPath,const char * socketPath,const char * driverURI,bool readonly)417 virNetClientSSHHelperCommand(virNetClientProxy proxy,
418                              const char *netcatPath,
419                              const char *socketPath,
420                              const char *driverURI,
421                              bool readonly)
422 {
423     g_autofree char *netcatPathSafe = virNetClientDoubleEscapeShell(netcatPath ? netcatPath : "nc");
424     g_autofree char *driverURISafe = virNetClientDoubleEscapeShell(driverURI);
425     g_autofree char *nccmd = NULL;
426     g_autofree char *helpercmd = NULL;
427 
428     /* If user gave a 'netcat' path in the URI, we must
429      * assume they want the legacy 'nc' based proxy, not
430      * our new virt-ssh-helper
431      */
432     if (proxy == VIR_NET_CLIENT_PROXY_AUTO &&
433         netcatPath != NULL) {
434         proxy = VIR_NET_CLIENT_PROXY_NETCAT;
435     }
436 
437     nccmd = g_strdup_printf(
438         "if '%s' -q 2>&1 | grep \"requires an argument\" >/dev/null 2>&1; then "
439             "ARG=-q0;"
440         "else "
441             "ARG=;"
442         "fi;"
443         "'%s' $ARG -U %s",
444         netcatPathSafe, netcatPathSafe, socketPath);
445 
446     helpercmd = g_strdup_printf("virt-ssh-helper%s'%s'",
447                                 readonly ? " -r " : " ",
448                                 driverURISafe);
449 
450     switch (proxy) {
451     case VIR_NET_CLIENT_PROXY_AUTO:
452         return g_strdup_printf("sh -c 'which virt-ssh-helper 1>/dev/null 2>&1; "
453                                "if test $? = 0; then "
454                                "    %s; "
455                                "else"
456                                "    %s; "
457                                "fi'", helpercmd, nccmd);
458 
459     case VIR_NET_CLIENT_PROXY_NETCAT:
460         return g_strdup_printf("sh -c '%s'", nccmd);
461 
462     case VIR_NET_CLIENT_PROXY_NATIVE:
463         if (netcatPath) {
464             virReportError(VIR_ERR_INVALID_ARG, "%s",
465                            _("netcat path not valid with native proxy mode"));
466             return NULL;
467         }
468         return g_strdup_printf("sh -c '%s'", helpercmd);
469 
470     case VIR_NET_CLIENT_PROXY_LAST:
471     default:
472         virReportEnumRangeError(virNetClientProxy, proxy);
473         return NULL;
474     }
475 }
476 
477 
478 #define DEFAULT_VALUE(VAR, VAL) \
479     if (!VAR) \
480         VAR = VAL;
481 
virNetClientNewSSH(const char * nodename,const char * service,const char * binary,const char * username,bool noTTY,bool noVerify,const char * keyfile,virNetClientProxy proxy,const char * netcatPath,const char * socketPath,const char * driverURI,bool readonly)482 virNetClient *virNetClientNewSSH(const char *nodename,
483                                    const char *service,
484                                    const char *binary,
485                                    const char *username,
486                                    bool noTTY,
487                                    bool noVerify,
488                                    const char *keyfile,
489                                    virNetClientProxy proxy,
490                                    const char *netcatPath,
491                                    const char *socketPath,
492                                    const char *driverURI,
493                                    bool readonly)
494 {
495     virNetSocket *sock;
496     g_autofree char *command = NULL;
497 
498     if (!(command = virNetClientSSHHelperCommand(proxy, netcatPath, socketPath,
499                                                  driverURI, readonly)))
500         return NULL;
501 
502     if (virNetSocketNewConnectSSH(nodename, service, binary, username, noTTY,
503                                   noVerify, keyfile, command, &sock) < 0)
504         return NULL;
505 
506     return virNetClientNew(sock, NULL);
507 }
508 
virNetClientNewLibSSH2(const char * host,const char * port,int family,const char * username,const char * privkeyPath,const char * knownHostsPath,const char * knownHostsVerify,const char * authMethods,virNetClientProxy proxy,const char * netcatPath,const char * socketPath,const char * driverURI,bool readonly,virConnectAuthPtr authPtr,virURI * uri)509 virNetClient *virNetClientNewLibSSH2(const char *host,
510                                        const char *port,
511                                        int family,
512                                        const char *username,
513                                        const char *privkeyPath,
514                                        const char *knownHostsPath,
515                                        const char *knownHostsVerify,
516                                        const char *authMethods,
517                                        virNetClientProxy proxy,
518                                        const char *netcatPath,
519                                        const char *socketPath,
520                                        const char *driverURI,
521                                        bool readonly,
522                                        virConnectAuthPtr authPtr,
523                                        virURI *uri)
524 {
525     virNetSocket *sock = NULL;
526     g_autofree char *command = NULL;
527     g_autofree char *homedir = NULL;
528     g_autofree char *confdir = NULL;
529     g_autofree char *knownhosts = NULL;
530     g_autofree char *privkey = NULL;
531 
532     /* Use default paths for known hosts an public keys if not provided */
533     if (knownHostsPath) {
534         knownhosts = g_strdup(knownHostsPath);
535     } else {
536         confdir = virGetUserConfigDirectory();
537         knownhosts = g_strdup_printf("%s/known_hosts", confdir);
538     }
539 
540     if (privkeyPath) {
541         privkey = g_strdup(privkeyPath);
542     } else {
543         homedir = virGetUserDirectory();
544         if (virNetClientFindDefaultSshKey(homedir, &privkey) < 0)
545             return NULL;
546     }
547 
548     if (!authMethods) {
549         if (privkey)
550             authMethods = "agent,privkey,password,keyboard-interactive";
551         else
552             authMethods = "agent,password,keyboard-interactive";
553     }
554 
555     DEFAULT_VALUE(host, "localhost");
556     DEFAULT_VALUE(port, "22");
557     DEFAULT_VALUE(username, "root");
558     DEFAULT_VALUE(knownHostsVerify, "normal");
559 
560     if (!(command = virNetClientSSHHelperCommand(proxy, netcatPath, socketPath,
561                                                  driverURI, readonly)))
562         return NULL;
563 
564     if (virNetSocketNewConnectLibSSH2(host, port,
565                                       family,
566                                       username, privkey,
567                                       knownhosts, knownHostsVerify, authMethods,
568                                       command, authPtr, uri, &sock) != 0)
569         return NULL;
570 
571    return virNetClientNew(sock, NULL);
572 }
573 
virNetClientNewLibssh(const char * host,const char * port,int family,const char * username,const char * privkeyPath,const char * knownHostsPath,const char * knownHostsVerify,const char * authMethods,virNetClientProxy proxy,const char * netcatPath,const char * socketPath,const char * driverURI,bool readonly,virConnectAuthPtr authPtr,virURI * uri)574 virNetClient *virNetClientNewLibssh(const char *host,
575                                       const char *port,
576                                       int family,
577                                       const char *username,
578                                       const char *privkeyPath,
579                                       const char *knownHostsPath,
580                                       const char *knownHostsVerify,
581                                       const char *authMethods,
582                                       virNetClientProxy proxy,
583                                       const char *netcatPath,
584                                       const char *socketPath,
585                                       const char *driverURI,
586                                       bool readonly,
587                                       virConnectAuthPtr authPtr,
588                                       virURI *uri)
589 {
590     virNetSocket *sock = NULL;
591     g_autofree char *command = NULL;
592     g_autofree char *homedir = NULL;
593     g_autofree char *confdir = NULL;
594     g_autofree char *knownhosts = NULL;
595     g_autofree char *privkey = NULL;
596 
597     /* Use default paths for known hosts an public keys if not provided */
598     if (knownHostsPath) {
599         knownhosts = g_strdup(knownHostsPath);
600     } else {
601         confdir = virGetUserConfigDirectory();
602         knownhosts = g_strdup_printf("%s/known_hosts", confdir);
603     }
604 
605     if (privkeyPath) {
606         privkey = g_strdup(privkeyPath);
607     } else {
608         homedir = virGetUserDirectory();
609         if (virNetClientFindDefaultSshKey(homedir, &privkey) < 0)
610             return NULL;
611     }
612 
613     if (!authMethods) {
614         if (privkey)
615             authMethods = "agent,privkey,password,keyboard-interactive";
616         else
617             authMethods = "agent,password,keyboard-interactive";
618     }
619 
620     DEFAULT_VALUE(host, "localhost");
621     DEFAULT_VALUE(port, "22");
622     DEFAULT_VALUE(username, "root");
623     DEFAULT_VALUE(knownHostsVerify, "normal");
624 
625     if (!(command = virNetClientSSHHelperCommand(proxy, netcatPath, socketPath,
626                                                  driverURI, readonly)))
627         return NULL;
628 
629     if (virNetSocketNewConnectLibssh(host, port,
630                                      family,
631                                      username, privkey,
632                                      knownhosts, knownHostsVerify, authMethods,
633                                      command, authPtr, uri, &sock) != 0)
634         return NULL;
635 
636     return virNetClientNew(sock, NULL);
637 }
638 #undef DEFAULT_VALUE
639 
virNetClientNewExternal(const char ** cmdargv)640 virNetClient *virNetClientNewExternal(const char **cmdargv)
641 {
642     virNetSocket *sock;
643 
644     if (virNetSocketNewConnectExternal(cmdargv, &sock) < 0)
645         return NULL;
646 
647     return virNetClientNew(sock, NULL);
648 }
649 
650 
virNetClientRegisterAsyncIO(virNetClient * client)651 int virNetClientRegisterAsyncIO(virNetClient *client)
652 {
653     if (client->asyncIO)
654         return 0;
655 
656     /* Set up a callback to listen on the socket data */
657     virObjectRef(client);
658     if (virNetSocketAddIOCallback(client->sock,
659                                   VIR_EVENT_HANDLE_READABLE,
660                                   virNetClientIncomingEvent,
661                                   client,
662                                   virObjectFreeCallback) < 0) {
663         virObjectUnref(client);
664         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
665                        _("Unable to register async IO callback"));
666         return -1;
667     }
668 
669     client->asyncIO = true;
670     return 0;
671 }
672 
673 
virNetClientRegisterKeepAlive(virNetClient * client)674 int virNetClientRegisterKeepAlive(virNetClient *client)
675 {
676     virKeepAlive *ka;
677 
678     if (client->keepalive)
679         return 0;
680 
681     if (!client->asyncIO) {
682         virReportError(VIR_ERR_OPERATION_INVALID, "%s",
683                        _("Unable to enable keepalives without async IO support"));
684         return -1;
685     }
686 
687     /* Keepalive protocol consists of async messages so it can only be used
688      * if the client supports them */
689     if (!(ka = virKeepAliveNew(-1, 0, client,
690                                virNetClientKeepAliveSendCB,
691                                virNetClientKeepAliveDeadCB,
692                                virObjectFreeCallback)))
693         return -1;
694 
695     /* keepalive object has a reference to client */
696     virObjectRef(client);
697 
698     client->keepalive = ka;
699     return 0;
700 }
701 
702 
virNetClientGetFD(virNetClient * client)703 int virNetClientGetFD(virNetClient *client)
704 {
705     int fd;
706     virObjectLock(client);
707     fd = virNetSocketGetFD(client->sock);
708     virObjectUnlock(client);
709     return fd;
710 }
711 
712 
virNetClientDupFD(virNetClient * client,bool cloexec)713 int virNetClientDupFD(virNetClient *client, bool cloexec)
714 {
715     int fd;
716     virObjectLock(client);
717     fd = virNetSocketDupFD(client->sock, cloexec);
718     virObjectUnlock(client);
719     return fd;
720 }
721 
722 
virNetClientHasPassFD(virNetClient * client)723 bool virNetClientHasPassFD(virNetClient *client)
724 {
725     bool hasPassFD;
726     virObjectLock(client);
727     hasPassFD = virNetSocketHasPassFD(client->sock);
728     virObjectUnlock(client);
729     return hasPassFD;
730 }
731 
732 
virNetClientDispose(void * obj)733 void virNetClientDispose(void *obj)
734 {
735     virNetClient *client = obj;
736     size_t i;
737 
738     PROBE(RPC_CLIENT_DISPOSE,
739           "client=%p", client);
740 
741     if (client->closeFf)
742         client->closeFf(client->closeOpaque);
743 
744     for (i = 0; i < client->nprograms; i++)
745         virObjectUnref(client->programs[i]);
746     g_free(client->programs);
747 
748     g_main_loop_unref(client->eventLoop);
749     g_main_context_unref(client->eventCtx);
750 
751     g_free(client->hostname);
752 
753     if (client->sock)
754         virNetSocketRemoveIOCallback(client->sock);
755     virObjectUnref(client->sock);
756     virObjectUnref(client->tls);
757 #if WITH_SASL
758     virObjectUnref(client->sasl);
759 #endif
760 
761     virNetMessageClear(&client->msg);
762 }
763 
764 
765 static void
virNetClientMarkClose(virNetClient * client,int reason)766 virNetClientMarkClose(virNetClient *client,
767                       int reason)
768 {
769     VIR_DEBUG("client=%p, reason=%d", client, reason);
770 
771     if (client->sock)
772         virNetSocketRemoveIOCallback(client->sock);
773 
774     /* Don't override reason that's already set. */
775     if (!client->wantClose) {
776         if (!client->error)
777             client->error = virSaveLastError();
778         client->wantClose = true;
779         client->closeReason = reason;
780     }
781 }
782 
783 
784 static void
virNetClientCloseLocked(virNetClient * client)785 virNetClientCloseLocked(virNetClient *client)
786 {
787     virKeepAlive *ka;
788 
789     VIR_DEBUG("client=%p, sock=%p, reason=%d", client, client->sock, client->closeReason);
790 
791     if (!client->sock)
792         return;
793 
794     virObjectUnref(client->sock);
795     client->sock = NULL;
796     virObjectUnref(client->tls);
797     client->tls = NULL;
798 #if WITH_SASL
799     virObjectUnref(client->sasl);
800     client->sasl = NULL;
801 #endif
802     ka = g_steal_pointer(&client->keepalive);
803     client->wantClose = false;
804 
805     virFreeError(client->error);
806     client->error = NULL;
807 
808     if (ka || client->closeCb) {
809         virNetClientCloseFunc closeCb = client->closeCb;
810         void *closeOpaque = client->closeOpaque;
811         int closeReason = client->closeReason;
812         virObjectRef(client);
813         virObjectUnlock(client);
814 
815         if (ka) {
816             virKeepAliveStop(ka);
817             virObjectUnref(ka);
818         }
819         if (closeCb)
820             closeCb(client, closeReason, closeOpaque);
821 
822         virObjectLock(client);
823         virObjectUnref(client);
824     }
825 }
826 
827 
virNetClientCloseInternal(virNetClient * client,int reason)828 static void virNetClientCloseInternal(virNetClient *client,
829                                       int reason)
830 {
831     VIR_DEBUG("client=%p wantclose=%d", client, client ? client->wantClose : false);
832 
833     if (!client)
834         return;
835 
836     if (!client->sock ||
837         client->wantClose)
838         return;
839 
840     virObjectLock(client);
841 
842     virNetClientMarkClose(client, reason);
843 
844     /* If there is a thread polling for data on the socket, wake the thread up
845      * otherwise try to pass the buck to a possibly waiting thread. If no
846      * thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the
847      * queue and close the client because we set client->wantClose.
848      */
849     if (client->haveTheBuck) {
850         g_main_loop_quit(client->eventLoop);
851     } else {
852         virNetClientIOEventLoopPassTheBuck(client, NULL);
853     }
854 
855     virObjectUnlock(client);
856 }
857 
858 
virNetClientClose(virNetClient * client)859 void virNetClientClose(virNetClient *client)
860 {
861     virNetClientCloseInternal(client, VIR_CONNECT_CLOSE_REASON_CLIENT);
862 }
863 
864 
865 #if WITH_SASL
virNetClientSetSASLSession(virNetClient * client,virNetSASLSession * sasl)866 void virNetClientSetSASLSession(virNetClient *client,
867                                 virNetSASLSession *sasl)
868 {
869     virObjectLock(client);
870     client->sasl = virObjectRef(sasl);
871     virNetSocketSetSASLSession(client->sock, client->sasl);
872     virObjectUnlock(client);
873 }
874 #endif
875 
876 
877 static gboolean
878 virNetClientIOEventTLS(int fd,
879                        GIOCondition ev,
880                        gpointer opaque);
881 
882 static gboolean
virNetClientTLSHandshake(virNetClient * client)883 virNetClientTLSHandshake(virNetClient *client)
884 {
885     g_autoptr(GSource) G_GNUC_UNUSED source = NULL;
886     GIOCondition ev;
887     int ret;
888 
889     ret = virNetTLSSessionHandshake(client->tls);
890 
891     if (ret <= 0)
892         return FALSE;
893 
894     if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
895         VIR_NET_TLS_HANDSHAKE_RECVING)
896         ev = G_IO_IN;
897     else
898         ev = G_IO_OUT;
899 
900     source = virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
901                                         ev,
902                                         client->eventCtx,
903                                         virNetClientIOEventTLS, client, NULL);
904 
905     return TRUE;
906 }
907 
908 
909 static gboolean
virNetClientIOEventTLS(int fd G_GNUC_UNUSED,GIOCondition ev G_GNUC_UNUSED,gpointer opaque)910 virNetClientIOEventTLS(int fd G_GNUC_UNUSED,
911                        GIOCondition ev G_GNUC_UNUSED,
912                        gpointer opaque)
913 {
914     virNetClient *client = opaque;
915 
916     if (!virNetClientTLSHandshake(client))
917         g_main_loop_quit(client->eventLoop);
918 
919     return G_SOURCE_REMOVE;
920 }
921 
922 
923 static gboolean
virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,GIOCondition ev G_GNUC_UNUSED,gpointer opaque)924 virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,
925                               GIOCondition ev G_GNUC_UNUSED,
926                               gpointer opaque)
927 {
928     virNetClient *client = opaque;
929 
930     g_main_loop_quit(client->eventLoop);
931 
932     return G_SOURCE_REMOVE;
933 }
934 
935 
virNetClientSetTLSSession(virNetClient * client,virNetTLSContext * tls)936 int virNetClientSetTLSSession(virNetClient *client,
937                               virNetTLSContext *tls)
938 {
939     int ret;
940     char buf[1];
941     int len;
942     g_autoptr(GSource) G_GNUC_UNUSED source = NULL;
943 
944 #ifndef WIN32
945     sigset_t oldmask, blockedsigs;
946 
947     sigemptyset(&blockedsigs);
948 # ifdef SIGWINCH
949     sigaddset(&blockedsigs, SIGWINCH);
950 # endif
951 # ifdef SIGCHLD
952     sigaddset(&blockedsigs, SIGCHLD);
953 # endif
954     sigaddset(&blockedsigs, SIGPIPE);
955 #endif /* !WIN32 */
956 
957     virObjectLock(client);
958 
959     if (!(client->tls = virNetTLSSessionNew(tls,
960                                             client->hostname)))
961         goto error;
962 
963     virNetSocketSetTLSSession(client->sock, client->tls);
964 
965     virResetLastError();
966     if (virNetClientTLSHandshake(client)) {
967 #ifndef WIN32
968         /* Block SIGWINCH from interrupting poll in curses programs,
969          * then restore the original signal mask again immediately
970          * after the call (RHBZ#567931).  Same for SIGCHLD and SIGPIPE
971          * at the suggestion of Paolo Bonzini and Daniel Berrange.
972          */
973         ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
974 #endif /* !WIN32 */
975 
976         g_main_loop_run(client->eventLoop);
977 
978 #ifndef WIN32
979         ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
980 #endif /* !WIN32 */
981     }
982 
983     if (virGetLastErrorCode() != VIR_ERR_OK)
984         goto error;
985 
986     ret = virNetTLSContextCheckCertificate(tls, client->tls);
987 
988     if (ret < 0)
989         goto error;
990 
991     /* At this point, the server is verifying _our_ certificate, IP address,
992      * etc.  If we make the grade, it will send us a '\1' byte.
993      */
994 
995     source = virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
996                                         G_IO_IN,
997                                         client->eventCtx,
998                                         virNetClientIOEventTLSConfirm, client, NULL);
999 
1000 #ifndef WIN32
1001     /* Block SIGWINCH from interrupting poll in curses programs */
1002     ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
1003 #endif /* !WIN32 */
1004 
1005     g_main_loop_run(client->eventLoop);
1006 
1007 #ifndef WIN32
1008     ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
1009 #endif /* !WIN32 */
1010 
1011     len = virNetTLSSessionRead(client->tls, buf, 1);
1012     if (len < 0 && errno != ENOMSG) {
1013         virReportSystemError(errno, "%s",
1014                              _("Unable to read TLS confirmation"));
1015         goto error;
1016     }
1017     if (len != 1 || buf[0] != '\1') {
1018         virReportError(VIR_ERR_RPC, "%s",
1019                        _("server verification (of our certificate or IP "
1020                          "address) failed"));
1021         goto error;
1022     }
1023 
1024     virObjectUnlock(client);
1025     return 0;
1026 
1027  error:
1028     virObjectUnref(client->tls);
1029     client->tls = NULL;
1030     virObjectUnlock(client);
1031     return -1;
1032 }
1033 
virNetClientIsEncrypted(virNetClient * client)1034 bool virNetClientIsEncrypted(virNetClient *client)
1035 {
1036     bool ret = false;
1037     virObjectLock(client);
1038     if (client->tls)
1039         ret = true;
1040 #if WITH_SASL
1041     if (client->sasl)
1042         ret = true;
1043 #endif
1044     virObjectUnlock(client);
1045     return ret;
1046 }
1047 
1048 
virNetClientIsOpen(virNetClient * client)1049 bool virNetClientIsOpen(virNetClient *client)
1050 {
1051     bool ret;
1052 
1053     if (!client)
1054         return false;
1055 
1056     virObjectLock(client);
1057     ret = client->sock && !client->wantClose;
1058     virObjectUnlock(client);
1059     return ret;
1060 }
1061 
1062 
virNetClientAddProgram(virNetClient * client,virNetClientProgram * prog)1063 int virNetClientAddProgram(virNetClient *client,
1064                            virNetClientProgram *prog)
1065 {
1066     virObjectLock(client);
1067 
1068     VIR_EXPAND_N(client->programs, client->nprograms, 1);
1069     client->programs[client->nprograms-1] = virObjectRef(prog);
1070 
1071     virObjectUnlock(client);
1072     return 0;
1073 }
1074 
1075 
virNetClientAddStream(virNetClient * client,virNetClientStream * st)1076 int virNetClientAddStream(virNetClient *client,
1077                           virNetClientStream *st)
1078 {
1079     virObjectLock(client);
1080 
1081     VIR_EXPAND_N(client->streams, client->nstreams, 1);
1082     client->streams[client->nstreams-1] = virObjectRef(st);
1083 
1084     virObjectUnlock(client);
1085     return 0;
1086 }
1087 
1088 
virNetClientRemoveStream(virNetClient * client,virNetClientStream * st)1089 void virNetClientRemoveStream(virNetClient *client,
1090                               virNetClientStream *st)
1091 {
1092     size_t i;
1093 
1094     virObjectLock(client);
1095 
1096     for (i = 0; i < client->nstreams; i++) {
1097         if (client->streams[i] == st)
1098             break;
1099     }
1100     if (i == client->nstreams)
1101         goto cleanup;
1102 
1103     VIR_DELETE_ELEMENT(client->streams, i, client->nstreams);
1104     virObjectUnref(st);
1105 
1106  cleanup:
1107     virObjectUnlock(client);
1108 }
1109 
1110 
virNetClientLocalAddrStringSASL(virNetClient * client)1111 const char *virNetClientLocalAddrStringSASL(virNetClient *client)
1112 {
1113     return virNetSocketLocalAddrStringSASL(client->sock);
1114 }
1115 
virNetClientRemoteAddrStringSASL(virNetClient * client)1116 const char *virNetClientRemoteAddrStringSASL(virNetClient *client)
1117 {
1118     return virNetSocketRemoteAddrStringSASL(client->sock);
1119 }
1120 
virNetClientGetTLSKeySize(virNetClient * client)1121 int virNetClientGetTLSKeySize(virNetClient *client)
1122 {
1123     int ret = 0;
1124     virObjectLock(client);
1125     if (client->tls)
1126         ret = virNetTLSSessionGetKeySize(client->tls);
1127     virObjectUnlock(client);
1128     return ret;
1129 }
1130 
1131 static int
virNetClientCallDispatchReply(virNetClient * client)1132 virNetClientCallDispatchReply(virNetClient *client)
1133 {
1134     virNetClientCall *thecall;
1135 
1136     /* Ok, definitely got an RPC reply now find
1137        out which waiting call is associated with it */
1138     thecall = client->waitDispatch;
1139     while (thecall &&
1140            !(thecall->msg->header.prog == client->msg.header.prog &&
1141              thecall->msg->header.vers == client->msg.header.vers &&
1142              thecall->msg->header.serial == client->msg.header.serial))
1143         thecall = thecall->next;
1144 
1145     if (!thecall) {
1146         virReportError(VIR_ERR_RPC,
1147                        _("no call waiting for reply with prog %d vers %d serial %d"),
1148                        client->msg.header.prog, client->msg.header.vers, client->msg.header.serial);
1149         return -1;
1150     }
1151 
1152     VIR_REALLOC_N(thecall->msg->buffer, client->msg.bufferLength);
1153 
1154     memcpy(thecall->msg->buffer, client->msg.buffer, client->msg.bufferLength);
1155     memcpy(&thecall->msg->header, &client->msg.header, sizeof(client->msg.header));
1156     thecall->msg->bufferLength = client->msg.bufferLength;
1157     thecall->msg->bufferOffset = client->msg.bufferOffset;
1158 
1159     thecall->msg->nfds = client->msg.nfds;
1160     thecall->msg->fds = g_steal_pointer(&client->msg.fds);
1161     client->msg.nfds = 0;
1162 
1163     thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
1164 
1165     return 0;
1166 }
1167 
virNetClientCallDispatchMessage(virNetClient * client)1168 static int virNetClientCallDispatchMessage(virNetClient *client)
1169 {
1170     size_t i;
1171     virNetClientProgram *prog = NULL;
1172 
1173     for (i = 0; i < client->nprograms; i++) {
1174         if (virNetClientProgramMatches(client->programs[i],
1175                                        &client->msg)) {
1176             prog = client->programs[i];
1177             break;
1178         }
1179     }
1180     if (!prog) {
1181         VIR_DEBUG("No program found for event with prog=%d vers=%d",
1182                   client->msg.header.prog, client->msg.header.vers);
1183         return -1;
1184     }
1185 
1186     virNetClientProgramDispatch(prog, client, &client->msg);
1187 
1188     return 0;
1189 }
1190 
virNetClientCallCompleteAllWaitingReply(virNetClient * client)1191 static void virNetClientCallCompleteAllWaitingReply(virNetClient *client)
1192 {
1193     virNetClientCall *call;
1194 
1195     for (call = client->waitDispatch; call; call = call->next) {
1196         if (call->msg->header.prog == client->msg.header.prog &&
1197             call->msg->header.vers == client->msg.header.vers &&
1198             call->msg->header.serial == client->msg.header.serial &&
1199             call->expectReply)
1200             call->mode = VIR_NET_CLIENT_MODE_COMPLETE;
1201     }
1202 }
1203 
virNetClientCallDispatchStream(virNetClient * client)1204 static int virNetClientCallDispatchStream(virNetClient *client)
1205 {
1206     size_t i;
1207     virNetClientStream *st = NULL;
1208     virNetClientCall *thecall;
1209 
1210     /* First identify what stream this packet is directed at */
1211     for (i = 0; i < client->nstreams; i++) {
1212         if (virNetClientStreamMatches(client->streams[i],
1213                                       &client->msg)) {
1214             st = client->streams[i];
1215             break;
1216         }
1217     }
1218     if (!st) {
1219         VIR_DEBUG("No stream found for packet with prog=%d vers=%d serial=%u proc=%u",
1220                   client->msg.header.prog, client->msg.header.vers,
1221                   client->msg.header.serial, client->msg.header.proc);
1222         /* Don't return -1, because we expect to see further stream packets
1223          * after we've shut it down sometimes */
1224         return 0;
1225     }
1226 
1227 
1228     /* Status is either
1229      *   - VIR_NET_OK - no payload for streams
1230      *   - VIR_NET_ERROR - followed by a remote_error struct
1231      *   - VIR_NET_CONTINUE - followed by a raw data packet
1232      */
1233     switch (client->msg.header.status) {
1234     case VIR_NET_CONTINUE: {
1235         if (virNetClientStreamQueuePacket(st, &client->msg) < 0)
1236             return -1;
1237 
1238         /* Find oldest dummy message waiting for incoming data. */
1239         for (thecall = client->waitDispatch; thecall; thecall = thecall->next) {
1240             if (thecall->msg->header.prog == client->msg.header.prog &&
1241                 thecall->msg->header.vers == client->msg.header.vers &&
1242                 thecall->msg->header.serial == client->msg.header.serial &&
1243                 thecall->expectReply &&
1244                 thecall->msg->header.status == VIR_NET_CONTINUE)
1245                 break;
1246         }
1247 
1248         if (thecall) {
1249             VIR_DEBUG("Got a new incoming stream data");
1250             thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
1251         }
1252         return 0;
1253     }
1254 
1255     case VIR_NET_OK:
1256         /* Find oldest abort/finish message. */
1257         for (thecall = client->waitDispatch; thecall; thecall = thecall->next) {
1258             if (thecall->msg->header.prog == client->msg.header.prog &&
1259                 thecall->msg->header.vers == client->msg.header.vers &&
1260                 thecall->msg->header.serial == client->msg.header.serial &&
1261                 thecall->expectReply &&
1262                 thecall->msg->header.status != VIR_NET_CONTINUE)
1263                 break;
1264         }
1265 
1266         if (!thecall) {
1267             VIR_DEBUG("Got unexpected async stream finish confirmation");
1268             return -1;
1269         }
1270 
1271         VIR_DEBUG("Got a synchronous abort/finish confirm");
1272 
1273         virNetClientStreamSetClosed(st,
1274                                     thecall->msg->header.status == VIR_NET_OK ?
1275                                         VIR_NET_CLIENT_STREAM_CLOSED_FINISHED :
1276                                         VIR_NET_CLIENT_STREAM_CLOSED_ABORTED);
1277 
1278         virNetClientCallCompleteAllWaitingReply(client);
1279         return 0;
1280 
1281     case VIR_NET_ERROR:
1282         /* No call, so queue the error against the stream */
1283         if (virNetClientStreamSetError(st, &client->msg) < 0)
1284             return -1;
1285 
1286         virNetClientCallCompleteAllWaitingReply(client);
1287         return 0;
1288 
1289     default:
1290         VIR_WARN("Stream with unexpected serial=%d, proc=%d, status=%d",
1291                  client->msg.header.serial, client->msg.header.proc,
1292                  client->msg.header.status);
1293         return -1;
1294     }
1295 
1296     return 0;
1297 }
1298 
1299 
1300 static int
virNetClientCallDispatch(virNetClient * client)1301 virNetClientCallDispatch(virNetClient *client)
1302 {
1303     virNetMessage *response = NULL;
1304 
1305     PROBE(RPC_CLIENT_MSG_RX,
1306           "client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
1307           client, client->msg.bufferLength,
1308           client->msg.header.prog, client->msg.header.vers, client->msg.header.proc,
1309           client->msg.header.type, client->msg.header.status, client->msg.header.serial);
1310 
1311     if (virKeepAliveCheckMessage(client->keepalive, &client->msg, &response)) {
1312         if (response &&
1313             virNetClientQueueNonBlocking(client, response) < 0) {
1314             VIR_WARN("Could not queue keepalive response");
1315             virNetMessageFree(response);
1316         }
1317         return 0;
1318     }
1319 
1320     switch (client->msg.header.type) {
1321     case VIR_NET_REPLY: /* Normal RPC replies */
1322     case VIR_NET_REPLY_WITH_FDS: /* Normal RPC replies with FDs */
1323         return virNetClientCallDispatchReply(client);
1324 
1325     case VIR_NET_MESSAGE: /* Async notifications */
1326         return virNetClientCallDispatchMessage(client);
1327 
1328     case VIR_NET_STREAM: /* Stream protocol */
1329     case VIR_NET_STREAM_HOLE: /* Sparse stream protocol */
1330         return virNetClientCallDispatchStream(client);
1331 
1332     case VIR_NET_CALL:
1333     case VIR_NET_CALL_WITH_FDS:
1334     default:
1335         virReportError(VIR_ERR_RPC,
1336                        _("got unexpected RPC call prog %d vers %d proc %d type %d"),
1337                        client->msg.header.prog, client->msg.header.vers,
1338                        client->msg.header.proc, client->msg.header.type);
1339         return -1;
1340     }
1341 }
1342 
1343 
1344 static ssize_t
virNetClientIOWriteMessage(virNetClient * client,virNetClientCall * thecall)1345 virNetClientIOWriteMessage(virNetClient *client,
1346                            virNetClientCall *thecall)
1347 {
1348     ssize_t ret = 0;
1349 
1350     if (thecall->msg->bufferOffset < thecall->msg->bufferLength) {
1351         ret = virNetSocketWrite(client->sock,
1352                                 thecall->msg->buffer + thecall->msg->bufferOffset,
1353                                 thecall->msg->bufferLength - thecall->msg->bufferOffset);
1354         if (ret <= 0)
1355             return ret;
1356 
1357         thecall->msg->bufferOffset += ret;
1358     }
1359 
1360     if (thecall->msg->bufferOffset == thecall->msg->bufferLength) {
1361         size_t i;
1362         for (i = thecall->msg->donefds; i < thecall->msg->nfds; i++) {
1363             int rv;
1364             if ((rv = virNetSocketSendFD(client->sock, thecall->msg->fds[i])) < 0)
1365                 return -1;
1366             if (rv == 0) /* Blocking */
1367                 return 0;
1368             thecall->msg->donefds++;
1369         }
1370         virNetMessageClearPayload(thecall->msg);
1371         if (thecall->expectReply)
1372             thecall->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
1373         else
1374             thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
1375     }
1376 
1377     return ret;
1378 }
1379 
1380 
1381 static ssize_t
virNetClientIOHandleOutput(virNetClient * client)1382 virNetClientIOHandleOutput(virNetClient *client)
1383 {
1384     virNetClientCall *thecall = client->waitDispatch;
1385 
1386     while (thecall &&
1387            thecall->mode != VIR_NET_CLIENT_MODE_WAIT_TX)
1388         thecall = thecall->next;
1389 
1390     if (!thecall)
1391         return 0; /* This can happen if another thread raced with us and
1392                    * completed the call between the time this thread woke
1393                    * up from poll()ing and the time we locked the client
1394                    */
1395 
1396     while (thecall) {
1397         ssize_t ret = virNetClientIOWriteMessage(client, thecall);
1398         if (ret < 0)
1399             return ret;
1400 
1401         if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
1402             return 0; /* Blocking write, to back to event loop */
1403 
1404         thecall = thecall->next;
1405     }
1406 
1407     return 0; /* No more calls to send, all done */
1408 }
1409 
1410 static ssize_t
virNetClientIOReadMessage(virNetClient * client)1411 virNetClientIOReadMessage(virNetClient *client)
1412 {
1413     size_t wantData;
1414     ssize_t ret;
1415 
1416     /* Start by reading length word */
1417     if (client->msg.bufferLength == 0) {
1418         client->msg.bufferLength = 4;
1419         client->msg.buffer = g_new0(char, client->msg.bufferLength);
1420     }
1421 
1422     wantData = client->msg.bufferLength - client->msg.bufferOffset;
1423 
1424     ret = virNetSocketRead(client->sock,
1425                            client->msg.buffer + client->msg.bufferOffset,
1426                            wantData);
1427     if (ret <= 0)
1428         return ret;
1429 
1430     client->msg.bufferOffset += ret;
1431 
1432     return ret;
1433 }
1434 
1435 
1436 static ssize_t
virNetClientIOHandleInput(virNetClient * client)1437 virNetClientIOHandleInput(virNetClient *client)
1438 {
1439     /* Read as much data as is available, until we get
1440      * EAGAIN
1441      */
1442     for (;;) {
1443         ssize_t ret;
1444 
1445         if (client->msg.nfds == 0) {
1446             ret = virNetClientIOReadMessage(client);
1447 
1448             if (ret < 0)
1449                 return -1;
1450             if (ret == 0)
1451                 return 0;  /* Blocking on read */
1452         }
1453 
1454         /* Check for completion of our goal */
1455         if (client->msg.bufferOffset == client->msg.bufferLength) {
1456             if (client->msg.bufferOffset == 4) {
1457                 ret = virNetMessageDecodeLength(&client->msg);
1458                 if (ret < 0)
1459                     return -1;
1460 
1461                 /*
1462                  * We'll carry on around the loop to immediately
1463                  * process the message body, because it has probably
1464                  * already arrived. Worst case, we'll get EAGAIN on
1465                  * next iteration.
1466                  */
1467             } else {
1468                 if (virNetMessageDecodeHeader(&client->msg) < 0)
1469                     return -1;
1470 
1471                 if (client->msg.header.type == VIR_NET_REPLY_WITH_FDS) {
1472                     size_t i;
1473 
1474                     if (virNetMessageDecodeNumFDs(&client->msg) < 0)
1475                         return -1;
1476 
1477                     for (i = client->msg.donefds; i < client->msg.nfds; i++) {
1478                         int rv;
1479                         if ((rv = virNetSocketRecvFD(client->sock, &(client->msg.fds[i]))) < 0)
1480                             return -1;
1481                         if (rv == 0) /* Blocking */
1482                             break;
1483                         client->msg.donefds++;
1484                     }
1485 
1486                     if (client->msg.donefds < client->msg.nfds) {
1487                         /* Because DecodeHeader/NumFDs reset bufferOffset, we
1488                          * put it back to what it was, so everything works
1489                          * again next time we run this method
1490                          */
1491                         client->msg.bufferOffset = client->msg.bufferLength;
1492                         return 0; /* Blocking on more fds */
1493                     }
1494                 }
1495 
1496                 ret = virNetClientCallDispatch(client);
1497                 virNetMessageClear(&client->msg);
1498                 /*
1499                  * We've completed one call, but we don't want to
1500                  * spin around the loop forever if there are many
1501                  * incoming async events, or replies for other
1502                  * thread's RPC calls. We want to get out & let
1503                  * any other thread take over as soon as we've
1504                  * got our reply. When SASL is active though, we
1505                  * may have read more data off the wire than we
1506                  * initially wanted & cached it in memory. In this
1507                  * case, poll() would not detect that there is more
1508                  * ready todo.
1509                  *
1510                  * So if SASL is active *and* some SASL data is
1511                  * already cached, then we'll process that now,
1512                  * before returning.
1513                  */
1514                 if (ret == 0 &&
1515                     virNetSocketHasCachedData(client->sock))
1516                     continue;
1517                 return ret;
1518             }
1519         }
1520     }
1521 }
1522 
1523 
virNetClientIOEventLoopPollEvents(virNetClientCall * call,void * opaque)1524 static bool virNetClientIOEventLoopPollEvents(virNetClientCall *call,
1525                                               void *opaque)
1526 {
1527     GIOCondition *ev = opaque;
1528 
1529     if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
1530         *ev |= G_IO_IN;
1531     if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
1532         *ev |= G_IO_OUT;
1533 
1534     return false;
1535 }
1536 
1537 
virNetClientIOEventLoopRemoveDone(virNetClientCall * call,void * opaque)1538 static bool virNetClientIOEventLoopRemoveDone(virNetClientCall *call,
1539                                               void *opaque)
1540 {
1541     virNetClientCall *thiscall = opaque;
1542 
1543     if (call == thiscall)
1544         return false;
1545 
1546     if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE)
1547         return false;
1548 
1549     /*
1550      * ...if the call being removed from the list
1551      * still has a thread, then wake that thread up,
1552      * otherwise free the call. The latter should
1553      * only happen for calls without replies.
1554      *
1555      * ...the threads won't actually wakeup until
1556      * we release our mutex a short while
1557      * later...
1558      */
1559     if (call->haveThread) {
1560         VIR_DEBUG("Waking up sleep %p", call);
1561         virCondSignal(&call->cond);
1562     } else {
1563         VIR_DEBUG("Removing completed call %p", call);
1564         if (call->expectReply)
1565             VIR_WARN("Got a call expecting a reply but without a waiting thread");
1566         virCondDestroy(&call->cond);
1567         virNetMessageFree(call->msg);
1568         VIR_FREE(call);
1569     }
1570 
1571     return true;
1572 }
1573 
1574 
1575 static void
virNetClientIODetachNonBlocking(virNetClientCall * call)1576 virNetClientIODetachNonBlocking(virNetClientCall *call)
1577 {
1578     VIR_DEBUG("Keeping unfinished non-blocking call %p in the queue", call);
1579     call->haveThread = false;
1580 }
1581 
1582 
1583 static bool
virNetClientIOEventLoopRemoveAll(virNetClientCall * call,void * opaque)1584 virNetClientIOEventLoopRemoveAll(virNetClientCall *call,
1585                                  void *opaque)
1586 {
1587     virNetClientCall *thiscall = opaque;
1588 
1589     if (call == thiscall)
1590         return false;
1591 
1592     VIR_DEBUG("Removing call %p", call);
1593     virCondDestroy(&call->cond);
1594     virNetMessageFree(call->msg);
1595     VIR_FREE(call);
1596     return true;
1597 }
1598 
1599 
1600 static void
virNetClientIOEventLoopPassTheBuck(virNetClient * client,virNetClientCall * thiscall)1601 virNetClientIOEventLoopPassTheBuck(virNetClient *client,
1602                                    virNetClientCall *thiscall)
1603 {
1604     virNetClientCall *tmp = client->waitDispatch;
1605 
1606     VIR_DEBUG("Giving up the buck %p", thiscall);
1607 
1608     /* See if someone else is still waiting
1609      * and if so, then pass the buck ! */
1610     while (tmp) {
1611         if (tmp != thiscall && tmp->haveThread) {
1612             VIR_DEBUG("Passing the buck to %p", tmp);
1613             virCondSignal(&tmp->cond);
1614             return;
1615         }
1616         tmp = tmp->next;
1617     }
1618     client->haveTheBuck = false;
1619 
1620     VIR_DEBUG("No thread to pass the buck to");
1621     if (client->wantClose) {
1622         virNetClientCloseLocked(client);
1623         virNetClientCallRemovePredicate(&client->waitDispatch,
1624                                         virNetClientIOEventLoopRemoveAll,
1625                                         thiscall);
1626     }
1627 }
1628 
1629 
1630 struct virNetClientIOEventData {
1631     virNetClient *client;
1632     GIOCondition rev;
1633 };
1634 
1635 static gboolean
virNetClientIOEventFD(int fd G_GNUC_UNUSED,GIOCondition ev,gpointer opaque)1636 virNetClientIOEventFD(int fd G_GNUC_UNUSED,
1637                       GIOCondition ev,
1638                       gpointer opaque)
1639 {
1640     struct virNetClientIOEventData *data = opaque;
1641     data->rev = ev;
1642     g_main_loop_quit(data->client->eventLoop);
1643     return G_SOURCE_REMOVE;
1644 }
1645 
1646 
1647 /*
1648  * Process all calls pending dispatch/receive until we
1649  * get a reply to our own call. Then quit and pass the buck
1650  * to someone else.
1651  *
1652  * Returns 1 if the call was queued and will be completed later (only
1653  * for nonBlock == true), 0 if the call was completed and -1 on error.
1654  */
virNetClientIOEventLoop(virNetClient * client,virNetClientCall * thiscall)1655 static int virNetClientIOEventLoop(virNetClient *client,
1656                                    virNetClientCall *thiscall)
1657 {
1658     bool error = false;
1659     int closeReason;
1660 
1661     for (;;) {
1662 #ifndef WIN32
1663         sigset_t oldmask, blockedsigs;
1664 #endif /* !WIN32 */
1665         int timeout = -1;
1666         virNetMessage *msg = NULL;
1667         g_autoptr(GSource) G_GNUC_UNUSED source = NULL;
1668         GIOCondition ev = 0;
1669         struct virNetClientIOEventData data = {
1670             .client = client,
1671             .rev = 0,
1672         };
1673 
1674         /* If we have existing SASL decoded data we don't want to sleep in
1675          * the poll(), just check if any other FDs are also ready.
1676          * If the connection is going to be closed, we don't want to sleep in
1677          * poll() either.
1678          */
1679         if (virNetSocketHasCachedData(client->sock) || client->wantClose)
1680             timeout = 0;
1681 
1682         /* If we are non-blocking, then we don't want to sleep in poll() */
1683         if (thiscall->nonBlock)
1684             timeout = 0;
1685 
1686         /* Limit timeout so that we can send keepalive request in time */
1687         if (timeout == -1)
1688             timeout = virKeepAliveTimeout(client->keepalive);
1689 
1690         /* Calculate poll events for calls */
1691         virNetClientCallMatchPredicate(client->waitDispatch,
1692                                        virNetClientIOEventLoopPollEvents,
1693                                        &ev);
1694 
1695         /* We have to be prepared to receive stream data
1696          * regardless of whether any of the calls waiting
1697          * for dispatch are for streams.
1698          */
1699         if (client->nstreams)
1700             ev |= G_IO_IN;
1701 
1702         source = virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
1703                                             ev,
1704                                             client->eventCtx,
1705                                             virNetClientIOEventFD, &data, NULL);
1706 
1707         /* Release lock while poll'ing so other threads
1708          * can stuff themselves on the queue */
1709         virObjectUnlock(client);
1710 
1711 #ifndef WIN32
1712         /* Block SIGWINCH from interrupting poll in curses programs,
1713          * then restore the original signal mask again immediately
1714          * after the call (RHBZ#567931).  Same for SIGCHLD and SIGPIPE
1715          * at the suggestion of Paolo Bonzini and Daniel Berrange.
1716          */
1717         sigemptyset(&blockedsigs);
1718 # ifdef SIGWINCH
1719         sigaddset(&blockedsigs, SIGWINCH);
1720 # endif
1721 # ifdef SIGCHLD
1722         sigaddset(&blockedsigs, SIGCHLD);
1723 # endif
1724         sigaddset(&blockedsigs, SIGPIPE);
1725 
1726         ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
1727 #endif /* !WIN32 */
1728 
1729         g_main_loop_run(client->eventLoop);
1730 
1731 #ifndef WIN32
1732         ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
1733 #endif /* !WIN32 */
1734 
1735         virObjectLock(client);
1736 
1737         if (virKeepAliveTrigger(client->keepalive, &msg)) {
1738             virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_KEEPALIVE);
1739         } else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) {
1740             VIR_WARN("Could not queue keepalive request");
1741             virNetMessageFree(msg);
1742         }
1743 
1744         /* If we have existing SASL decoded data, pretend
1745          * the socket became readable so we consume it
1746          */
1747         if (virNetSocketHasCachedData(client->sock))
1748             data.rev |= G_IO_IN;
1749 
1750         /* If wantClose flag is set, pretend there was an error on the socket,
1751          * but still read and process any data we received so far.
1752          */
1753         if (client->wantClose)
1754             error = true;
1755 
1756         if (data.rev & G_IO_HUP)
1757             closeReason = VIR_CONNECT_CLOSE_REASON_EOF;
1758         else
1759             closeReason = VIR_CONNECT_CLOSE_REASON_ERROR;
1760 
1761         if (data.rev & G_IO_OUT) {
1762             if (virNetClientIOHandleOutput(client) < 0) {
1763                 virNetClientMarkClose(client, closeReason);
1764                 error = true;
1765                 /* Fall through to process any pending data. */
1766             }
1767         }
1768 
1769         if (data.rev & G_IO_IN) {
1770             if (virNetClientIOHandleInput(client) < 0) {
1771                 virNetClientMarkClose(client, closeReason);
1772                 error = true;
1773                 /* Fall through to process any pending data. */
1774             }
1775         }
1776 
1777         /* Iterate through waiting calls and if any are
1778          * complete, remove them from the dispatch list.
1779          */
1780         virNetClientCallRemovePredicate(&client->waitDispatch,
1781                                         virNetClientIOEventLoopRemoveDone,
1782                                         thiscall);
1783 
1784         /* Now see if *we* are done */
1785         if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
1786             virNetClientCallRemove(&client->waitDispatch, thiscall);
1787             virNetClientIOEventLoopPassTheBuck(client, thiscall);
1788             return 0;
1789         }
1790 
1791         /* We're not done, but we're non-blocking; keep the call queued */
1792         if (thiscall->nonBlock) {
1793             virNetClientIODetachNonBlocking(thiscall);
1794             virNetClientIOEventLoopPassTheBuck(client, thiscall);
1795             return 1;
1796         }
1797 
1798         if (error)
1799             goto error;
1800 
1801         if (data.rev & G_IO_HUP) {
1802             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
1803                            _("received hangup event on socket"));
1804             virNetClientMarkClose(client, closeReason);
1805             goto error;
1806         }
1807         if (data.rev & G_IO_ERR) {
1808             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
1809                            _("received error event on socket"));
1810             virNetClientMarkClose(client, closeReason);
1811             goto error;
1812         }
1813     }
1814 
1815  error:
1816     if (client->error) {
1817         VIR_DEBUG("error on socket: %s", client->error->message);
1818         virSetError(client->error);
1819     }
1820     virNetClientCallRemove(&client->waitDispatch, thiscall);
1821     virNetClientIOEventLoopPassTheBuck(client, thiscall);
1822     return -1;
1823 }
1824 
1825 
1826 static bool
virNetClientIOUpdateEvents(virNetClientCall * call,void * opaque)1827 virNetClientIOUpdateEvents(virNetClientCall *call,
1828                            void *opaque)
1829 {
1830     int *events = opaque;
1831 
1832     if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
1833         *events |= VIR_EVENT_HANDLE_WRITABLE;
1834 
1835     return false;
1836 }
1837 
1838 
virNetClientIOUpdateCallback(virNetClient * client,bool enableCallback)1839 static void virNetClientIOUpdateCallback(virNetClient *client,
1840                                          bool enableCallback)
1841 {
1842     int events = 0;
1843 
1844     if (client->wantClose)
1845         return;
1846 
1847     if (enableCallback) {
1848         events |= VIR_EVENT_HANDLE_READABLE;
1849         virNetClientCallMatchPredicate(client->waitDispatch,
1850                                        virNetClientIOUpdateEvents,
1851                                        &events);
1852     }
1853 
1854     virNetSocketUpdateIOCallback(client->sock, events);
1855 }
1856 
1857 
1858 /*
1859  * This function sends a message to remote server and awaits a reply
1860  *
1861  * NB. This does not free the args structure (not desirable, since you
1862  * often want this allocated on the stack or else it contains strings
1863  * which come from the user).  It does however free any intermediate
1864  * results, eg. the error structure if there is one.
1865  *
1866  * NB(2). Make sure to memset (&ret, 0, sizeof(ret)) before calling,
1867  * else Bad Things will happen in the XDR code.
1868  *
1869  * NB(3) You must have the client lock before calling this
1870  *
1871  * NB(4) This is very complicated. Multiple threads are allowed to
1872  * use the client for RPC at the same time. Obviously only one of
1873  * them can. So if someone's using the socket, other threads are put
1874  * to sleep on condition variables. The existing thread may completely
1875  * send & receive their RPC call/reply while they're asleep. Or it
1876  * may only get around to dealing with sending the call. Or it may
1877  * get around to neither. So upon waking up from slumber, the other
1878  * thread may or may not have more work todo.
1879  *
1880  * We call this dance  'passing the buck'
1881  *
1882  *      https://en.wikipedia.org/wiki/Passing_the_buck
1883  *
1884  *   "Buck passing or passing the buck is the action of transferring
1885  *    responsibility or blame unto another person. It is also used as
1886  *    a strategy in power politics when the actions of one country/
1887  *    nation are blamed on another, providing an opportunity for war."
1888  *
1889  * NB(5) If the 'thiscall' has the 'nonBlock' flag set, the caller
1890  * must *NOT* free it, if this returns '1' (ie partial send).
1891  *
1892  * NB(6) The following input states are valid if *no* threads
1893  *       are currently executing this method
1894  *
1895  *   - waitDispatch == NULL,
1896  *   - waitDispatch != NULL, waitDispatch.nonBlock == true
1897  *
1898  * The following input states are valid, if n threads are currently
1899  * executing
1900  *
1901  *   - waitDispatch != NULL
1902  *   - 0 or 1  waitDispatch.nonBlock == false, without any threads
1903  *   - 0 or more waitDispatch.nonBlock == false, with threads
1904  *
1905  * The following output states are valid when all threads are done
1906  *
1907  *   - waitDispatch == NULL,
1908  *   - waitDispatch != NULL, waitDispatch.nonBlock == true
1909  *
1910  * NB(7) Don't Panic!
1911  *
1912  * Returns 1 if the call was queued and will be completed later (only
1913  * for nonBlock == true), 0 if the call was completed and -1 on error.
1914  */
virNetClientIO(virNetClient * client,virNetClientCall * thiscall)1915 static int virNetClientIO(virNetClient *client,
1916                           virNetClientCall *thiscall)
1917 {
1918     int rv = -1;
1919 
1920     VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p",
1921               thiscall->msg->header.prog,
1922               thiscall->msg->header.vers,
1923               thiscall->msg->header.serial,
1924               thiscall->msg->header.proc,
1925               thiscall->msg->header.type,
1926               thiscall->msg->bufferLength,
1927               client->waitDispatch);
1928 
1929     /* Stick ourselves on the end of the wait queue */
1930     virNetClientCallQueue(&client->waitDispatch, thiscall);
1931 
1932     /* Check to see if another thread is dispatching */
1933     if (client->haveTheBuck) {
1934         /* Force other thread to wakeup from poll */
1935         g_main_loop_quit(client->eventLoop);
1936 
1937         /* If we are non-blocking, detach the thread and keep the call in the
1938          * queue. */
1939         if (thiscall->nonBlock) {
1940             virNetClientIODetachNonBlocking(thiscall);
1941             rv = 1;
1942             goto cleanup;
1943         }
1944 
1945         VIR_DEBUG("Going to sleep head=%p call=%p",
1946                   client->waitDispatch, thiscall);
1947         /* Go to sleep while other thread is working... */
1948         if (virCondWait(&thiscall->cond, &client->parent.lock) < 0) {
1949             virNetClientCallRemove(&client->waitDispatch, thiscall);
1950             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
1951                            _("failed to wait on condition"));
1952             return -1;
1953         }
1954 
1955         VIR_DEBUG("Woken up from sleep head=%p call=%p",
1956                   client->waitDispatch, thiscall);
1957         /* Three reasons we can be woken up
1958          *  1. Other thread has got our reply ready for us
1959          *  2. Other thread is all done, and it is our turn to
1960          *     be the dispatcher to finish waiting for
1961          *     our reply
1962          */
1963         if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
1964             rv = 0;
1965             /*
1966              * We avoided catching the buck and our reply is ready !
1967              * We've already had 'thiscall' removed from the list
1968              * so just need to (maybe) handle errors & free it
1969              */
1970             goto cleanup;
1971         }
1972 
1973         /* Grr, someone passed the buck to us ... */
1974     } else {
1975         client->haveTheBuck = true;
1976     }
1977 
1978     VIR_DEBUG("We have the buck head=%p call=%p",
1979               client->waitDispatch, thiscall);
1980 
1981     /*
1982      * The buck stops here!
1983      *
1984      * At this point we're about to own the dispatch
1985      * process...
1986      */
1987 
1988     /*
1989      * Avoid needless wake-ups of the event loop in the
1990      * case where this call is being made from a different
1991      * thread than the event loop. These wake-ups would
1992      * cause the event loop thread to be blocked on the
1993      * mutex for the duration of the call
1994      */
1995     virNetClientIOUpdateCallback(client, false);
1996 
1997     rv = virNetClientIOEventLoop(client, thiscall);
1998 
1999     if (client->sock)
2000         virNetClientIOUpdateCallback(client, true);
2001 
2002  cleanup:
2003     VIR_DEBUG("All done with our call head=%p call=%p rv=%d",
2004               client->waitDispatch, thiscall, rv);
2005     return rv;
2006 }
2007 
2008 
virNetClientIncomingEvent(virNetSocket * sock,int events,void * opaque)2009 void virNetClientIncomingEvent(virNetSocket *sock,
2010                                int events,
2011                                void *opaque)
2012 {
2013     virNetClient *client = opaque;
2014     int closeReason;
2015 
2016     virObjectLock(client);
2017 
2018     VIR_DEBUG("client=%p wantclose=%d", client, client ? client->wantClose : false);
2019 
2020     if (!client->sock)
2021         goto done;
2022 
2023     if (client->haveTheBuck || client->wantClose)
2024         goto done;
2025 
2026     VIR_DEBUG("Event fired %p %d", sock, events);
2027 
2028     if (events & VIR_EVENT_HANDLE_HANGUP)
2029         closeReason = VIR_CONNECT_CLOSE_REASON_EOF;
2030     else
2031         closeReason = VIR_CONNECT_CLOSE_REASON_ERROR;
2032 
2033     if (events & VIR_EVENT_HANDLE_WRITABLE) {
2034         if (virNetClientIOHandleOutput(client) < 0)
2035             virNetClientMarkClose(client, closeReason);
2036     }
2037 
2038     if (events & VIR_EVENT_HANDLE_READABLE) {
2039         if (virNetClientIOHandleInput(client) < 0)
2040             virNetClientMarkClose(client, closeReason);
2041     }
2042 
2043     if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
2044         VIR_DEBUG("VIR_EVENT_HANDLE_HANGUP or "
2045                   "VIR_EVENT_HANDLE_ERROR encountered");
2046         virNetClientMarkClose(client, closeReason);
2047         goto done;
2048     }
2049 
2050     /* Remove completed calls or signal their threads. */
2051     virNetClientCallRemovePredicate(&client->waitDispatch,
2052                                     virNetClientIOEventLoopRemoveDone,
2053                                     NULL);
2054     virNetClientIOUpdateCallback(client, true);
2055 
2056  done:
2057     if (client->wantClose && !client->haveTheBuck) {
2058         virNetClientCloseLocked(client);
2059         virNetClientCallRemovePredicate(&client->waitDispatch,
2060                                         virNetClientIOEventLoopRemoveAll,
2061                                         NULL);
2062     }
2063     virObjectUnlock(client);
2064 }
2065 
2066 
2067 static virNetClientCall *
virNetClientCallNew(virNetMessage * msg,bool expectReply,bool nonBlock)2068 virNetClientCallNew(virNetMessage *msg,
2069                     bool expectReply,
2070                     bool nonBlock)
2071 {
2072     virNetClientCall *call = NULL;
2073 
2074     if (expectReply &&
2075         (msg->bufferLength != 0) &&
2076         (msg->header.status == VIR_NET_CONTINUE)) {
2077         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
2078                        _("Attempt to send an asynchronous message with"
2079                          " a synchronous reply"));
2080         goto error;
2081     }
2082 
2083     if (expectReply && nonBlock) {
2084         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
2085                        _("Attempt to send a non-blocking message with"
2086                          " a synchronous reply"));
2087         goto error;
2088     }
2089 
2090     call = g_new0(virNetClientCall, 1);
2091 
2092     if (virCondInit(&call->cond) < 0) {
2093         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
2094                        _("cannot initialize condition variable"));
2095         goto error;
2096     }
2097 
2098     msg->donefds = 0;
2099     if (msg->bufferLength)
2100         call->mode = VIR_NET_CLIENT_MODE_WAIT_TX;
2101     else
2102         call->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
2103     call->msg = msg;
2104     call->expectReply = expectReply;
2105     call->nonBlock = nonBlock;
2106 
2107     VIR_DEBUG("New call %p: msg=%p, expectReply=%d, nonBlock=%d",
2108               call, msg, expectReply, nonBlock);
2109 
2110     return call;
2111 
2112  error:
2113     VIR_FREE(call);
2114     return NULL;
2115 }
2116 
2117 
2118 static int
virNetClientQueueNonBlocking(virNetClient * client,virNetMessage * msg)2119 virNetClientQueueNonBlocking(virNetClient *client,
2120                              virNetMessage *msg)
2121 {
2122     virNetClientCall *call;
2123 
2124     PROBE(RPC_CLIENT_MSG_TX_QUEUE,
2125           "client=%p len=%zu prog=%u vers=%u proc=%u"
2126           " type=%u status=%u serial=%u",
2127           client, msg->bufferLength,
2128           msg->header.prog, msg->header.vers, msg->header.proc,
2129           msg->header.type, msg->header.status, msg->header.serial);
2130 
2131     if (!(call = virNetClientCallNew(msg, false, true)))
2132         return -1;
2133 
2134     virNetClientCallQueue(&client->waitDispatch, call);
2135     return 0;
2136 }
2137 
2138 
2139 /*
2140  * Returns 1 if the call was queued and will be completed later (only
2141  * for nonBlock == true), 0 if the call was completed and -1 on error.
2142  */
virNetClientSendInternal(virNetClient * client,virNetMessage * msg,bool expectReply,bool nonBlock)2143 static int virNetClientSendInternal(virNetClient *client,
2144                                     virNetMessage *msg,
2145                                     bool expectReply,
2146                                     bool nonBlock)
2147 {
2148     virNetClientCall *call;
2149     int ret = -1;
2150 
2151     PROBE(RPC_CLIENT_MSG_TX_QUEUE,
2152           "client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
2153           client, msg->bufferLength,
2154           msg->header.prog, msg->header.vers, msg->header.proc,
2155           msg->header.type, msg->header.status, msg->header.serial);
2156 
2157     if (!client->sock || client->wantClose) {
2158         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
2159                        _("client socket is closed"));
2160         return -1;
2161     }
2162 
2163     if (!(call = virNetClientCallNew(msg, expectReply, nonBlock)))
2164         return -1;
2165 
2166     call->haveThread = true;
2167     ret = virNetClientIO(client, call);
2168 
2169     /* If queued, the call will be finished and freed later by another thread;
2170      * we're done. */
2171     if (ret == 1)
2172         return 1;
2173 
2174     virCondDestroy(&call->cond);
2175     VIR_FREE(call);
2176     return ret;
2177 }
2178 
2179 
2180 /*
2181  * @msg: a message allocated on heap or stack
2182  *
2183  * Send a message synchronously, and wait for the reply synchronously
2184  *
2185  * The caller is responsible for free'ing @msg if it was allocated
2186  * on the heap
2187  *
2188  * Returns 0 on success, -1 on failure
2189  */
virNetClientSendWithReply(virNetClient * client,virNetMessage * msg)2190 int virNetClientSendWithReply(virNetClient *client,
2191                               virNetMessage *msg)
2192 {
2193     int ret;
2194     virObjectLock(client);
2195     ret = virNetClientSendInternal(client, msg, true, false);
2196     virObjectUnlock(client);
2197     if (ret < 0)
2198         return -1;
2199     return 0;
2200 }
2201 
2202 
2203 /*
2204  * @msg: a message allocated on the heap.
2205  *
2206  * Send a message asynchronously, without any reply
2207  *
2208  * The caller is responsible for free'ing @msg, *except* if
2209  * this method returns 1.
2210  *
2211  * Returns 1 if the message was queued and will be completed later (only
2212  * for nonBlock == true), 0 if the message was completed and -1 on error.
2213  */
virNetClientSendNonBlock(virNetClient * client,virNetMessage * msg)2214 int virNetClientSendNonBlock(virNetClient *client,
2215                              virNetMessage *msg)
2216 {
2217     int ret;
2218     virObjectLock(client);
2219     ret = virNetClientSendInternal(client, msg, false, true);
2220     virObjectUnlock(client);
2221     return ret;
2222 }
2223 
2224 /*
2225  * @msg: a message allocated on heap or stack
2226  *
2227  * Send a message synchronously, and wait for the reply synchronously if
2228  * message is dummy (just to wait for incoming data) or abort/finish message.
2229  *
2230  * The caller is responsible for free'ing @msg if it was allocated
2231  * on the heap
2232  *
2233  * Returns 0 on success, -1 on failure
2234  */
virNetClientSendStream(virNetClient * client,virNetMessage * msg,virNetClientStream * st)2235 int virNetClientSendStream(virNetClient *client,
2236                            virNetMessage *msg,
2237                            virNetClientStream *st)
2238 {
2239     int ret = -1;
2240     bool expectReply = !msg->bufferLength ||
2241                        msg->header.status != VIR_NET_CONTINUE;
2242 
2243     virObjectLock(client);
2244 
2245     if (virNetClientStreamCheckState(st) < 0)
2246         goto cleanup;
2247 
2248     /* Check for EOF only if we are going to wait for incoming data */
2249     if (!msg->bufferLength && virNetClientStreamEOF(st)) {
2250         ret = 0;
2251         goto cleanup;
2252     }
2253 
2254     if (virNetClientSendInternal(client, msg, expectReply, false) < 0)
2255         goto cleanup;
2256 
2257     if (expectReply && virNetClientStreamCheckSendStatus(st, msg) < 0)
2258         goto cleanup;
2259 
2260     ret = 0;
2261 
2262  cleanup:
2263     virObjectUnlock(client);
2264 
2265     return ret;
2266 }
2267