1 /*
2 * virnetclient.c: generic network RPC client
3 *
4 * Copyright (C) 2006-2014 Red Hat, Inc.
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library. If not, see
18 * <http://www.gnu.org/licenses/>.
19 */
20
21 #include <config.h>
22
23 #include <unistd.h>
24 #include <signal.h>
25 #include <fcntl.h>
26
27 #include "virnetclient.h"
28 #include "virnetsocket.h"
29 #include "virkeepalive.h"
30 #include "viralloc.h"
31 #include "virthread.h"
32 #include "virfile.h"
33 #include "virlog.h"
34 #include "virutil.h"
35 #include "virerror.h"
36 #include "virprobe.h"
37 #include "virstring.h"
38 #include "vireventglibwatch.h"
39
40 #define VIR_FROM_THIS VIR_FROM_RPC
41
42 VIR_LOG_INIT("rpc.netclient");
43
44 typedef struct _virNetClientCall virNetClientCall;
45
46 enum {
47 VIR_NET_CLIENT_MODE_WAIT_TX,
48 VIR_NET_CLIENT_MODE_WAIT_RX,
49 VIR_NET_CLIENT_MODE_COMPLETE,
50 };
51
52 VIR_ENUM_IMPL(virNetClientProxy,
53 VIR_NET_CLIENT_PROXY_LAST,
54 "auto", "netcat", "native");
55
56 struct _virNetClientCall {
57 int mode;
58
59 virNetMessage *msg;
60 bool expectReply;
61 bool nonBlock;
62 bool haveThread;
63
64 virCond cond;
65
66 virNetClientCall *next;
67 };
68
69
70 struct _virNetClient {
71 virObjectLockable parent;
72
73 virNetSocket *sock;
74 bool asyncIO;
75
76 virNetTLSSession *tls;
77 char *hostname;
78
79 virNetClientProgram **programs;
80 size_t nprograms;
81
82 /* For incoming message packets */
83 virNetMessage msg;
84
85 #if WITH_SASL
86 virNetSASLSession *sasl;
87 #endif
88
89 GMainLoop *eventLoop;
90 GMainContext *eventCtx;
91
92 /*
93 * List of calls currently waiting for dispatch
94 * The calls should all have threads waiting for
95 * them, except possibly the first call in the list
96 * which might be a partially sent non-blocking call.
97 */
98 virNetClientCall *waitDispatch;
99 /* True if a thread holds the buck */
100 bool haveTheBuck;
101
102 size_t nstreams;
103 virNetClientStream **streams;
104
105 virKeepAlive *keepalive;
106 bool wantClose;
107 int closeReason;
108 virErrorPtr error;
109
110 virNetClientCloseFunc closeCb;
111 void *closeOpaque;
112 virFreeCallback closeFf;
113 };
114
115
116 static virClass *virNetClientClass;
117 static void virNetClientDispose(void *obj);
118
virNetClientOnceInit(void)119 static int virNetClientOnceInit(void)
120 {
121 if (!VIR_CLASS_NEW(virNetClient, virClassForObjectLockable()))
122 return -1;
123
124 return 0;
125 }
126
127 VIR_ONCE_GLOBAL_INIT(virNetClient);
128
129 static void virNetClientIOEventLoopPassTheBuck(virNetClient *client,
130 virNetClientCall *thiscall);
131 static int virNetClientQueueNonBlocking(virNetClient *client,
132 virNetMessage *msg);
133 static void virNetClientCloseInternal(virNetClient *client,
134 int reason);
135
136
virNetClientSetCloseCallback(virNetClient * client,virNetClientCloseFunc cb,void * opaque,virFreeCallback ff)137 void virNetClientSetCloseCallback(virNetClient *client,
138 virNetClientCloseFunc cb,
139 void *opaque,
140 virFreeCallback ff)
141 {
142 virObjectLock(client);
143 client->closeCb = cb;
144 client->closeOpaque = opaque;
145 client->closeFf = ff;
146 virObjectUnlock(client);
147 }
148
149
150 static void virNetClientIncomingEvent(virNetSocket *sock,
151 int events,
152 void *opaque);
153
154 /* Append a call to the end of the list */
virNetClientCallQueue(virNetClientCall ** head,virNetClientCall * call)155 static void virNetClientCallQueue(virNetClientCall **head,
156 virNetClientCall *call)
157 {
158 virNetClientCall *tmp = *head;
159 while (tmp && tmp->next)
160 tmp = tmp->next;
161 if (tmp)
162 tmp->next = call;
163 else
164 *head = call;
165 call->next = NULL;
166 }
167
168 #if 0
169 /* Obtain a call from the head of the list */
170 static virNetClientCall *virNetClientCallServe(virNetClientCall **head)
171 {
172 virNetClientCall *tmp = *head;
173 if (tmp)
174 *head = tmp->next;
175 else
176 *head = NULL;
177 tmp->next = NULL;
178 return tmp;
179 }
180 #endif
181
182 /* Remove a call from anywhere in the list */
virNetClientCallRemove(virNetClientCall ** head,virNetClientCall * call)183 static void virNetClientCallRemove(virNetClientCall **head,
184 virNetClientCall *call)
185 {
186 virNetClientCall *tmp = *head;
187 virNetClientCall *prev = NULL;
188 while (tmp) {
189 if (tmp == call) {
190 if (prev)
191 prev->next = g_steal_pointer(&tmp->next);
192 else
193 *head = g_steal_pointer(&tmp->next);
194 return;
195 }
196 prev = tmp;
197 tmp = tmp->next;
198 }
199 }
200
201 /* Predicate returns true if matches */
202 typedef bool (*virNetClientCallPredicate)(virNetClientCall *call, void *opaque);
203
204 /* Remove a list of calls from the list based on a predicate */
virNetClientCallRemovePredicate(virNetClientCall ** head,virNetClientCallPredicate pred,void * opaque)205 static void virNetClientCallRemovePredicate(virNetClientCall **head,
206 virNetClientCallPredicate pred,
207 void *opaque)
208 {
209 virNetClientCall *tmp = *head;
210 virNetClientCall *prev = NULL;
211 while (tmp) {
212 virNetClientCall *next = tmp->next;
213 tmp->next = NULL; /* Temp unlink */
214 if (pred(tmp, opaque)) {
215 if (prev)
216 prev->next = next;
217 else
218 *head = next;
219 } else {
220 tmp->next = next; /* Reverse temp unlink */
221 prev = tmp;
222 }
223 tmp = next;
224 }
225 }
226
227 /* Returns true if the predicate matches at least one call in the list */
virNetClientCallMatchPredicate(virNetClientCall * head,virNetClientCallPredicate pred,void * opaque)228 static bool virNetClientCallMatchPredicate(virNetClientCall *head,
229 virNetClientCallPredicate pred,
230 void *opaque)
231 {
232 virNetClientCall *tmp = head;
233 while (tmp) {
234 if (pred(tmp, opaque))
235 return true;
236 tmp = tmp->next;
237 }
238 return false;
239 }
240
241
242 bool
virNetClientKeepAliveIsSupported(virNetClient * client)243 virNetClientKeepAliveIsSupported(virNetClient *client)
244 {
245 bool supported;
246
247 virObjectLock(client);
248 supported = !!client->keepalive;
249 virObjectUnlock(client);
250
251 return supported;
252 }
253
254 int
virNetClientKeepAliveStart(virNetClient * client,int interval,unsigned int count)255 virNetClientKeepAliveStart(virNetClient *client,
256 int interval,
257 unsigned int count)
258 {
259 int ret;
260
261 virObjectLock(client);
262 ret = virKeepAliveStart(client->keepalive, interval, count);
263 virObjectUnlock(client);
264
265 return ret;
266 }
267
268 void
virNetClientKeepAliveStop(virNetClient * client)269 virNetClientKeepAliveStop(virNetClient *client)
270 {
271 virObjectLock(client);
272 virKeepAliveStop(client->keepalive);
273 virObjectUnlock(client);
274 }
275
276 static void
virNetClientKeepAliveDeadCB(void * opaque)277 virNetClientKeepAliveDeadCB(void *opaque)
278 {
279 virNetClientCloseInternal(opaque, VIR_CONNECT_CLOSE_REASON_KEEPALIVE);
280 }
281
282 static int
virNetClientKeepAliveSendCB(void * opaque,virNetMessage * msg)283 virNetClientKeepAliveSendCB(void *opaque,
284 virNetMessage *msg)
285 {
286 int ret;
287
288 ret = virNetClientSendNonBlock(opaque, msg);
289 if (ret != -1 && ret != 1)
290 virNetMessageFree(msg);
291 return ret;
292 }
293
virNetClientNew(virNetSocket * sock,const char * hostname)294 static virNetClient *virNetClientNew(virNetSocket *sock,
295 const char *hostname)
296 {
297 virNetClient *client = NULL;
298
299 if (virNetClientInitialize() < 0)
300 goto error;
301
302 if (!(client = virObjectLockableNew(virNetClientClass)))
303 goto error;
304
305 client->sock = g_steal_pointer(&sock);
306
307 client->eventCtx = g_main_context_new();
308 client->eventLoop = g_main_loop_new(client->eventCtx, FALSE);
309
310 client->hostname = g_strdup(hostname);
311
312 PROBE(RPC_CLIENT_NEW,
313 "client=%p sock=%p",
314 client, client->sock);
315 return client;
316
317 error:
318 virObjectUnref(client);
319 virObjectUnref(sock);
320 return NULL;
321 }
322
323 /*
324 * Check whether the specified SSH key exists.
325 *
326 * Return -1 on error, 0 if it does not exist, and 1 if it does exist.
327 */
328 static int
virNetClientCheckKeyExists(const char * homedir,const char * name,char ** retPath)329 virNetClientCheckKeyExists(const char *homedir,
330 const char *name,
331 char **retPath)
332 {
333 char *path;
334
335 path = g_strdup_printf("%s/.ssh/%s", homedir, name);
336
337 if (!(virFileExists(path))) {
338 VIR_FREE(path);
339 return 0;
340 }
341
342 *retPath = path;
343 return 1;
344 }
345
346 /*
347 * Detect the default SSH key, if existing.
348 *
349 * Return -1 on error, 0 if it does not exist, and 1 if it does exist.
350 */
351 static int
virNetClientFindDefaultSshKey(const char * homedir,char ** retPath)352 virNetClientFindDefaultSshKey(const char *homedir, char **retPath)
353 {
354 size_t i;
355
356 const char *keys[] = { "identity", "id_dsa", "id_ecdsa", "id_ed25519", "id_rsa" };
357
358 for (i = 0; i < G_N_ELEMENTS(keys); ++i) {
359 int ret = virNetClientCheckKeyExists(homedir, keys[i], retPath);
360 if (ret != 0)
361 return ret;
362 }
363
364 return 0;
365 }
366
367
virNetClientNewUNIX(const char * path,const char * spawnDaemonPath)368 virNetClient *virNetClientNewUNIX(const char *path,
369 const char *spawnDaemonPath)
370 {
371 virNetSocket *sock;
372
373 if (virNetSocketNewConnectUNIX(path, spawnDaemonPath, &sock) < 0)
374 return NULL;
375
376 return virNetClientNew(sock, NULL);
377 }
378
379
virNetClientNewTCP(const char * nodename,const char * service,int family)380 virNetClient *virNetClientNewTCP(const char *nodename,
381 const char *service,
382 int family)
383 {
384 virNetSocket *sock;
385
386 if (virNetSocketNewConnectTCP(nodename, service,
387 family,
388 &sock) < 0)
389 return NULL;
390
391 return virNetClientNew(sock, nodename);
392 }
393
394
395 /*
396 * The SSH Server uses shell to spawn the command we give
397 * it. Our command then invokes shell again. Thus we need
398 * to apply two levels of escaping, so that commands with
399 * whitespace in their path get correctly interpreted.
400 */
401 static char *
virNetClientDoubleEscapeShell(const char * str)402 virNetClientDoubleEscapeShell(const char *str)
403 {
404 virBuffer buf = VIR_BUFFER_INITIALIZER;
405 g_autofree char *tmp = NULL;
406
407 virBufferEscapeShell(&buf, str);
408
409 tmp = virBufferContentAndReset(&buf);
410
411 virBufferEscapeShell(&buf, tmp);
412
413 return virBufferContentAndReset(&buf);
414 }
415
416 char *
virNetClientSSHHelperCommand(virNetClientProxy proxy,const char * netcatPath,const char * socketPath,const char * driverURI,bool readonly)417 virNetClientSSHHelperCommand(virNetClientProxy proxy,
418 const char *netcatPath,
419 const char *socketPath,
420 const char *driverURI,
421 bool readonly)
422 {
423 g_autofree char *netcatPathSafe = virNetClientDoubleEscapeShell(netcatPath ? netcatPath : "nc");
424 g_autofree char *driverURISafe = virNetClientDoubleEscapeShell(driverURI);
425 g_autofree char *nccmd = NULL;
426 g_autofree char *helpercmd = NULL;
427
428 /* If user gave a 'netcat' path in the URI, we must
429 * assume they want the legacy 'nc' based proxy, not
430 * our new virt-ssh-helper
431 */
432 if (proxy == VIR_NET_CLIENT_PROXY_AUTO &&
433 netcatPath != NULL) {
434 proxy = VIR_NET_CLIENT_PROXY_NETCAT;
435 }
436
437 nccmd = g_strdup_printf(
438 "if '%s' -q 2>&1 | grep \"requires an argument\" >/dev/null 2>&1; then "
439 "ARG=-q0;"
440 "else "
441 "ARG=;"
442 "fi;"
443 "'%s' $ARG -U %s",
444 netcatPathSafe, netcatPathSafe, socketPath);
445
446 helpercmd = g_strdup_printf("virt-ssh-helper%s'%s'",
447 readonly ? " -r " : " ",
448 driverURISafe);
449
450 switch (proxy) {
451 case VIR_NET_CLIENT_PROXY_AUTO:
452 return g_strdup_printf("sh -c 'which virt-ssh-helper 1>/dev/null 2>&1; "
453 "if test $? = 0; then "
454 " %s; "
455 "else"
456 " %s; "
457 "fi'", helpercmd, nccmd);
458
459 case VIR_NET_CLIENT_PROXY_NETCAT:
460 return g_strdup_printf("sh -c '%s'", nccmd);
461
462 case VIR_NET_CLIENT_PROXY_NATIVE:
463 if (netcatPath) {
464 virReportError(VIR_ERR_INVALID_ARG, "%s",
465 _("netcat path not valid with native proxy mode"));
466 return NULL;
467 }
468 return g_strdup_printf("sh -c '%s'", helpercmd);
469
470 case VIR_NET_CLIENT_PROXY_LAST:
471 default:
472 virReportEnumRangeError(virNetClientProxy, proxy);
473 return NULL;
474 }
475 }
476
477
478 #define DEFAULT_VALUE(VAR, VAL) \
479 if (!VAR) \
480 VAR = VAL;
481
virNetClientNewSSH(const char * nodename,const char * service,const char * binary,const char * username,bool noTTY,bool noVerify,const char * keyfile,virNetClientProxy proxy,const char * netcatPath,const char * socketPath,const char * driverURI,bool readonly)482 virNetClient *virNetClientNewSSH(const char *nodename,
483 const char *service,
484 const char *binary,
485 const char *username,
486 bool noTTY,
487 bool noVerify,
488 const char *keyfile,
489 virNetClientProxy proxy,
490 const char *netcatPath,
491 const char *socketPath,
492 const char *driverURI,
493 bool readonly)
494 {
495 virNetSocket *sock;
496 g_autofree char *command = NULL;
497
498 if (!(command = virNetClientSSHHelperCommand(proxy, netcatPath, socketPath,
499 driverURI, readonly)))
500 return NULL;
501
502 if (virNetSocketNewConnectSSH(nodename, service, binary, username, noTTY,
503 noVerify, keyfile, command, &sock) < 0)
504 return NULL;
505
506 return virNetClientNew(sock, NULL);
507 }
508
virNetClientNewLibSSH2(const char * host,const char * port,int family,const char * username,const char * privkeyPath,const char * knownHostsPath,const char * knownHostsVerify,const char * authMethods,virNetClientProxy proxy,const char * netcatPath,const char * socketPath,const char * driverURI,bool readonly,virConnectAuthPtr authPtr,virURI * uri)509 virNetClient *virNetClientNewLibSSH2(const char *host,
510 const char *port,
511 int family,
512 const char *username,
513 const char *privkeyPath,
514 const char *knownHostsPath,
515 const char *knownHostsVerify,
516 const char *authMethods,
517 virNetClientProxy proxy,
518 const char *netcatPath,
519 const char *socketPath,
520 const char *driverURI,
521 bool readonly,
522 virConnectAuthPtr authPtr,
523 virURI *uri)
524 {
525 virNetSocket *sock = NULL;
526 g_autofree char *command = NULL;
527 g_autofree char *homedir = NULL;
528 g_autofree char *confdir = NULL;
529 g_autofree char *knownhosts = NULL;
530 g_autofree char *privkey = NULL;
531
532 /* Use default paths for known hosts an public keys if not provided */
533 if (knownHostsPath) {
534 knownhosts = g_strdup(knownHostsPath);
535 } else {
536 confdir = virGetUserConfigDirectory();
537 knownhosts = g_strdup_printf("%s/known_hosts", confdir);
538 }
539
540 if (privkeyPath) {
541 privkey = g_strdup(privkeyPath);
542 } else {
543 homedir = virGetUserDirectory();
544 if (virNetClientFindDefaultSshKey(homedir, &privkey) < 0)
545 return NULL;
546 }
547
548 if (!authMethods) {
549 if (privkey)
550 authMethods = "agent,privkey,password,keyboard-interactive";
551 else
552 authMethods = "agent,password,keyboard-interactive";
553 }
554
555 DEFAULT_VALUE(host, "localhost");
556 DEFAULT_VALUE(port, "22");
557 DEFAULT_VALUE(username, "root");
558 DEFAULT_VALUE(knownHostsVerify, "normal");
559
560 if (!(command = virNetClientSSHHelperCommand(proxy, netcatPath, socketPath,
561 driverURI, readonly)))
562 return NULL;
563
564 if (virNetSocketNewConnectLibSSH2(host, port,
565 family,
566 username, privkey,
567 knownhosts, knownHostsVerify, authMethods,
568 command, authPtr, uri, &sock) != 0)
569 return NULL;
570
571 return virNetClientNew(sock, NULL);
572 }
573
virNetClientNewLibssh(const char * host,const char * port,int family,const char * username,const char * privkeyPath,const char * knownHostsPath,const char * knownHostsVerify,const char * authMethods,virNetClientProxy proxy,const char * netcatPath,const char * socketPath,const char * driverURI,bool readonly,virConnectAuthPtr authPtr,virURI * uri)574 virNetClient *virNetClientNewLibssh(const char *host,
575 const char *port,
576 int family,
577 const char *username,
578 const char *privkeyPath,
579 const char *knownHostsPath,
580 const char *knownHostsVerify,
581 const char *authMethods,
582 virNetClientProxy proxy,
583 const char *netcatPath,
584 const char *socketPath,
585 const char *driverURI,
586 bool readonly,
587 virConnectAuthPtr authPtr,
588 virURI *uri)
589 {
590 virNetSocket *sock = NULL;
591 g_autofree char *command = NULL;
592 g_autofree char *homedir = NULL;
593 g_autofree char *confdir = NULL;
594 g_autofree char *knownhosts = NULL;
595 g_autofree char *privkey = NULL;
596
597 /* Use default paths for known hosts an public keys if not provided */
598 if (knownHostsPath) {
599 knownhosts = g_strdup(knownHostsPath);
600 } else {
601 confdir = virGetUserConfigDirectory();
602 knownhosts = g_strdup_printf("%s/known_hosts", confdir);
603 }
604
605 if (privkeyPath) {
606 privkey = g_strdup(privkeyPath);
607 } else {
608 homedir = virGetUserDirectory();
609 if (virNetClientFindDefaultSshKey(homedir, &privkey) < 0)
610 return NULL;
611 }
612
613 if (!authMethods) {
614 if (privkey)
615 authMethods = "agent,privkey,password,keyboard-interactive";
616 else
617 authMethods = "agent,password,keyboard-interactive";
618 }
619
620 DEFAULT_VALUE(host, "localhost");
621 DEFAULT_VALUE(port, "22");
622 DEFAULT_VALUE(username, "root");
623 DEFAULT_VALUE(knownHostsVerify, "normal");
624
625 if (!(command = virNetClientSSHHelperCommand(proxy, netcatPath, socketPath,
626 driverURI, readonly)))
627 return NULL;
628
629 if (virNetSocketNewConnectLibssh(host, port,
630 family,
631 username, privkey,
632 knownhosts, knownHostsVerify, authMethods,
633 command, authPtr, uri, &sock) != 0)
634 return NULL;
635
636 return virNetClientNew(sock, NULL);
637 }
638 #undef DEFAULT_VALUE
639
virNetClientNewExternal(const char ** cmdargv)640 virNetClient *virNetClientNewExternal(const char **cmdargv)
641 {
642 virNetSocket *sock;
643
644 if (virNetSocketNewConnectExternal(cmdargv, &sock) < 0)
645 return NULL;
646
647 return virNetClientNew(sock, NULL);
648 }
649
650
virNetClientRegisterAsyncIO(virNetClient * client)651 int virNetClientRegisterAsyncIO(virNetClient *client)
652 {
653 if (client->asyncIO)
654 return 0;
655
656 /* Set up a callback to listen on the socket data */
657 virObjectRef(client);
658 if (virNetSocketAddIOCallback(client->sock,
659 VIR_EVENT_HANDLE_READABLE,
660 virNetClientIncomingEvent,
661 client,
662 virObjectFreeCallback) < 0) {
663 virObjectUnref(client);
664 virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
665 _("Unable to register async IO callback"));
666 return -1;
667 }
668
669 client->asyncIO = true;
670 return 0;
671 }
672
673
virNetClientRegisterKeepAlive(virNetClient * client)674 int virNetClientRegisterKeepAlive(virNetClient *client)
675 {
676 virKeepAlive *ka;
677
678 if (client->keepalive)
679 return 0;
680
681 if (!client->asyncIO) {
682 virReportError(VIR_ERR_OPERATION_INVALID, "%s",
683 _("Unable to enable keepalives without async IO support"));
684 return -1;
685 }
686
687 /* Keepalive protocol consists of async messages so it can only be used
688 * if the client supports them */
689 if (!(ka = virKeepAliveNew(-1, 0, client,
690 virNetClientKeepAliveSendCB,
691 virNetClientKeepAliveDeadCB,
692 virObjectFreeCallback)))
693 return -1;
694
695 /* keepalive object has a reference to client */
696 virObjectRef(client);
697
698 client->keepalive = ka;
699 return 0;
700 }
701
702
virNetClientGetFD(virNetClient * client)703 int virNetClientGetFD(virNetClient *client)
704 {
705 int fd;
706 virObjectLock(client);
707 fd = virNetSocketGetFD(client->sock);
708 virObjectUnlock(client);
709 return fd;
710 }
711
712
virNetClientDupFD(virNetClient * client,bool cloexec)713 int virNetClientDupFD(virNetClient *client, bool cloexec)
714 {
715 int fd;
716 virObjectLock(client);
717 fd = virNetSocketDupFD(client->sock, cloexec);
718 virObjectUnlock(client);
719 return fd;
720 }
721
722
virNetClientHasPassFD(virNetClient * client)723 bool virNetClientHasPassFD(virNetClient *client)
724 {
725 bool hasPassFD;
726 virObjectLock(client);
727 hasPassFD = virNetSocketHasPassFD(client->sock);
728 virObjectUnlock(client);
729 return hasPassFD;
730 }
731
732
virNetClientDispose(void * obj)733 void virNetClientDispose(void *obj)
734 {
735 virNetClient *client = obj;
736 size_t i;
737
738 PROBE(RPC_CLIENT_DISPOSE,
739 "client=%p", client);
740
741 if (client->closeFf)
742 client->closeFf(client->closeOpaque);
743
744 for (i = 0; i < client->nprograms; i++)
745 virObjectUnref(client->programs[i]);
746 g_free(client->programs);
747
748 g_main_loop_unref(client->eventLoop);
749 g_main_context_unref(client->eventCtx);
750
751 g_free(client->hostname);
752
753 if (client->sock)
754 virNetSocketRemoveIOCallback(client->sock);
755 virObjectUnref(client->sock);
756 virObjectUnref(client->tls);
757 #if WITH_SASL
758 virObjectUnref(client->sasl);
759 #endif
760
761 virNetMessageClear(&client->msg);
762 }
763
764
765 static void
virNetClientMarkClose(virNetClient * client,int reason)766 virNetClientMarkClose(virNetClient *client,
767 int reason)
768 {
769 VIR_DEBUG("client=%p, reason=%d", client, reason);
770
771 if (client->sock)
772 virNetSocketRemoveIOCallback(client->sock);
773
774 /* Don't override reason that's already set. */
775 if (!client->wantClose) {
776 if (!client->error)
777 client->error = virSaveLastError();
778 client->wantClose = true;
779 client->closeReason = reason;
780 }
781 }
782
783
784 static void
virNetClientCloseLocked(virNetClient * client)785 virNetClientCloseLocked(virNetClient *client)
786 {
787 virKeepAlive *ka;
788
789 VIR_DEBUG("client=%p, sock=%p, reason=%d", client, client->sock, client->closeReason);
790
791 if (!client->sock)
792 return;
793
794 virObjectUnref(client->sock);
795 client->sock = NULL;
796 virObjectUnref(client->tls);
797 client->tls = NULL;
798 #if WITH_SASL
799 virObjectUnref(client->sasl);
800 client->sasl = NULL;
801 #endif
802 ka = g_steal_pointer(&client->keepalive);
803 client->wantClose = false;
804
805 virFreeError(client->error);
806 client->error = NULL;
807
808 if (ka || client->closeCb) {
809 virNetClientCloseFunc closeCb = client->closeCb;
810 void *closeOpaque = client->closeOpaque;
811 int closeReason = client->closeReason;
812 virObjectRef(client);
813 virObjectUnlock(client);
814
815 if (ka) {
816 virKeepAliveStop(ka);
817 virObjectUnref(ka);
818 }
819 if (closeCb)
820 closeCb(client, closeReason, closeOpaque);
821
822 virObjectLock(client);
823 virObjectUnref(client);
824 }
825 }
826
827
virNetClientCloseInternal(virNetClient * client,int reason)828 static void virNetClientCloseInternal(virNetClient *client,
829 int reason)
830 {
831 VIR_DEBUG("client=%p wantclose=%d", client, client ? client->wantClose : false);
832
833 if (!client)
834 return;
835
836 if (!client->sock ||
837 client->wantClose)
838 return;
839
840 virObjectLock(client);
841
842 virNetClientMarkClose(client, reason);
843
844 /* If there is a thread polling for data on the socket, wake the thread up
845 * otherwise try to pass the buck to a possibly waiting thread. If no
846 * thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the
847 * queue and close the client because we set client->wantClose.
848 */
849 if (client->haveTheBuck) {
850 g_main_loop_quit(client->eventLoop);
851 } else {
852 virNetClientIOEventLoopPassTheBuck(client, NULL);
853 }
854
855 virObjectUnlock(client);
856 }
857
858
virNetClientClose(virNetClient * client)859 void virNetClientClose(virNetClient *client)
860 {
861 virNetClientCloseInternal(client, VIR_CONNECT_CLOSE_REASON_CLIENT);
862 }
863
864
865 #if WITH_SASL
virNetClientSetSASLSession(virNetClient * client,virNetSASLSession * sasl)866 void virNetClientSetSASLSession(virNetClient *client,
867 virNetSASLSession *sasl)
868 {
869 virObjectLock(client);
870 client->sasl = virObjectRef(sasl);
871 virNetSocketSetSASLSession(client->sock, client->sasl);
872 virObjectUnlock(client);
873 }
874 #endif
875
876
877 static gboolean
878 virNetClientIOEventTLS(int fd,
879 GIOCondition ev,
880 gpointer opaque);
881
882 static gboolean
virNetClientTLSHandshake(virNetClient * client)883 virNetClientTLSHandshake(virNetClient *client)
884 {
885 g_autoptr(GSource) G_GNUC_UNUSED source = NULL;
886 GIOCondition ev;
887 int ret;
888
889 ret = virNetTLSSessionHandshake(client->tls);
890
891 if (ret <= 0)
892 return FALSE;
893
894 if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
895 VIR_NET_TLS_HANDSHAKE_RECVING)
896 ev = G_IO_IN;
897 else
898 ev = G_IO_OUT;
899
900 source = virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
901 ev,
902 client->eventCtx,
903 virNetClientIOEventTLS, client, NULL);
904
905 return TRUE;
906 }
907
908
909 static gboolean
virNetClientIOEventTLS(int fd G_GNUC_UNUSED,GIOCondition ev G_GNUC_UNUSED,gpointer opaque)910 virNetClientIOEventTLS(int fd G_GNUC_UNUSED,
911 GIOCondition ev G_GNUC_UNUSED,
912 gpointer opaque)
913 {
914 virNetClient *client = opaque;
915
916 if (!virNetClientTLSHandshake(client))
917 g_main_loop_quit(client->eventLoop);
918
919 return G_SOURCE_REMOVE;
920 }
921
922
923 static gboolean
virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,GIOCondition ev G_GNUC_UNUSED,gpointer opaque)924 virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,
925 GIOCondition ev G_GNUC_UNUSED,
926 gpointer opaque)
927 {
928 virNetClient *client = opaque;
929
930 g_main_loop_quit(client->eventLoop);
931
932 return G_SOURCE_REMOVE;
933 }
934
935
virNetClientSetTLSSession(virNetClient * client,virNetTLSContext * tls)936 int virNetClientSetTLSSession(virNetClient *client,
937 virNetTLSContext *tls)
938 {
939 int ret;
940 char buf[1];
941 int len;
942 g_autoptr(GSource) G_GNUC_UNUSED source = NULL;
943
944 #ifndef WIN32
945 sigset_t oldmask, blockedsigs;
946
947 sigemptyset(&blockedsigs);
948 # ifdef SIGWINCH
949 sigaddset(&blockedsigs, SIGWINCH);
950 # endif
951 # ifdef SIGCHLD
952 sigaddset(&blockedsigs, SIGCHLD);
953 # endif
954 sigaddset(&blockedsigs, SIGPIPE);
955 #endif /* !WIN32 */
956
957 virObjectLock(client);
958
959 if (!(client->tls = virNetTLSSessionNew(tls,
960 client->hostname)))
961 goto error;
962
963 virNetSocketSetTLSSession(client->sock, client->tls);
964
965 virResetLastError();
966 if (virNetClientTLSHandshake(client)) {
967 #ifndef WIN32
968 /* Block SIGWINCH from interrupting poll in curses programs,
969 * then restore the original signal mask again immediately
970 * after the call (RHBZ#567931). Same for SIGCHLD and SIGPIPE
971 * at the suggestion of Paolo Bonzini and Daniel Berrange.
972 */
973 ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
974 #endif /* !WIN32 */
975
976 g_main_loop_run(client->eventLoop);
977
978 #ifndef WIN32
979 ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
980 #endif /* !WIN32 */
981 }
982
983 if (virGetLastErrorCode() != VIR_ERR_OK)
984 goto error;
985
986 ret = virNetTLSContextCheckCertificate(tls, client->tls);
987
988 if (ret < 0)
989 goto error;
990
991 /* At this point, the server is verifying _our_ certificate, IP address,
992 * etc. If we make the grade, it will send us a '\1' byte.
993 */
994
995 source = virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
996 G_IO_IN,
997 client->eventCtx,
998 virNetClientIOEventTLSConfirm, client, NULL);
999
1000 #ifndef WIN32
1001 /* Block SIGWINCH from interrupting poll in curses programs */
1002 ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
1003 #endif /* !WIN32 */
1004
1005 g_main_loop_run(client->eventLoop);
1006
1007 #ifndef WIN32
1008 ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
1009 #endif /* !WIN32 */
1010
1011 len = virNetTLSSessionRead(client->tls, buf, 1);
1012 if (len < 0 && errno != ENOMSG) {
1013 virReportSystemError(errno, "%s",
1014 _("Unable to read TLS confirmation"));
1015 goto error;
1016 }
1017 if (len != 1 || buf[0] != '\1') {
1018 virReportError(VIR_ERR_RPC, "%s",
1019 _("server verification (of our certificate or IP "
1020 "address) failed"));
1021 goto error;
1022 }
1023
1024 virObjectUnlock(client);
1025 return 0;
1026
1027 error:
1028 virObjectUnref(client->tls);
1029 client->tls = NULL;
1030 virObjectUnlock(client);
1031 return -1;
1032 }
1033
virNetClientIsEncrypted(virNetClient * client)1034 bool virNetClientIsEncrypted(virNetClient *client)
1035 {
1036 bool ret = false;
1037 virObjectLock(client);
1038 if (client->tls)
1039 ret = true;
1040 #if WITH_SASL
1041 if (client->sasl)
1042 ret = true;
1043 #endif
1044 virObjectUnlock(client);
1045 return ret;
1046 }
1047
1048
virNetClientIsOpen(virNetClient * client)1049 bool virNetClientIsOpen(virNetClient *client)
1050 {
1051 bool ret;
1052
1053 if (!client)
1054 return false;
1055
1056 virObjectLock(client);
1057 ret = client->sock && !client->wantClose;
1058 virObjectUnlock(client);
1059 return ret;
1060 }
1061
1062
virNetClientAddProgram(virNetClient * client,virNetClientProgram * prog)1063 int virNetClientAddProgram(virNetClient *client,
1064 virNetClientProgram *prog)
1065 {
1066 virObjectLock(client);
1067
1068 VIR_EXPAND_N(client->programs, client->nprograms, 1);
1069 client->programs[client->nprograms-1] = virObjectRef(prog);
1070
1071 virObjectUnlock(client);
1072 return 0;
1073 }
1074
1075
virNetClientAddStream(virNetClient * client,virNetClientStream * st)1076 int virNetClientAddStream(virNetClient *client,
1077 virNetClientStream *st)
1078 {
1079 virObjectLock(client);
1080
1081 VIR_EXPAND_N(client->streams, client->nstreams, 1);
1082 client->streams[client->nstreams-1] = virObjectRef(st);
1083
1084 virObjectUnlock(client);
1085 return 0;
1086 }
1087
1088
virNetClientRemoveStream(virNetClient * client,virNetClientStream * st)1089 void virNetClientRemoveStream(virNetClient *client,
1090 virNetClientStream *st)
1091 {
1092 size_t i;
1093
1094 virObjectLock(client);
1095
1096 for (i = 0; i < client->nstreams; i++) {
1097 if (client->streams[i] == st)
1098 break;
1099 }
1100 if (i == client->nstreams)
1101 goto cleanup;
1102
1103 VIR_DELETE_ELEMENT(client->streams, i, client->nstreams);
1104 virObjectUnref(st);
1105
1106 cleanup:
1107 virObjectUnlock(client);
1108 }
1109
1110
virNetClientLocalAddrStringSASL(virNetClient * client)1111 const char *virNetClientLocalAddrStringSASL(virNetClient *client)
1112 {
1113 return virNetSocketLocalAddrStringSASL(client->sock);
1114 }
1115
virNetClientRemoteAddrStringSASL(virNetClient * client)1116 const char *virNetClientRemoteAddrStringSASL(virNetClient *client)
1117 {
1118 return virNetSocketRemoteAddrStringSASL(client->sock);
1119 }
1120
virNetClientGetTLSKeySize(virNetClient * client)1121 int virNetClientGetTLSKeySize(virNetClient *client)
1122 {
1123 int ret = 0;
1124 virObjectLock(client);
1125 if (client->tls)
1126 ret = virNetTLSSessionGetKeySize(client->tls);
1127 virObjectUnlock(client);
1128 return ret;
1129 }
1130
1131 static int
virNetClientCallDispatchReply(virNetClient * client)1132 virNetClientCallDispatchReply(virNetClient *client)
1133 {
1134 virNetClientCall *thecall;
1135
1136 /* Ok, definitely got an RPC reply now find
1137 out which waiting call is associated with it */
1138 thecall = client->waitDispatch;
1139 while (thecall &&
1140 !(thecall->msg->header.prog == client->msg.header.prog &&
1141 thecall->msg->header.vers == client->msg.header.vers &&
1142 thecall->msg->header.serial == client->msg.header.serial))
1143 thecall = thecall->next;
1144
1145 if (!thecall) {
1146 virReportError(VIR_ERR_RPC,
1147 _("no call waiting for reply with prog %d vers %d serial %d"),
1148 client->msg.header.prog, client->msg.header.vers, client->msg.header.serial);
1149 return -1;
1150 }
1151
1152 VIR_REALLOC_N(thecall->msg->buffer, client->msg.bufferLength);
1153
1154 memcpy(thecall->msg->buffer, client->msg.buffer, client->msg.bufferLength);
1155 memcpy(&thecall->msg->header, &client->msg.header, sizeof(client->msg.header));
1156 thecall->msg->bufferLength = client->msg.bufferLength;
1157 thecall->msg->bufferOffset = client->msg.bufferOffset;
1158
1159 thecall->msg->nfds = client->msg.nfds;
1160 thecall->msg->fds = g_steal_pointer(&client->msg.fds);
1161 client->msg.nfds = 0;
1162
1163 thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
1164
1165 return 0;
1166 }
1167
virNetClientCallDispatchMessage(virNetClient * client)1168 static int virNetClientCallDispatchMessage(virNetClient *client)
1169 {
1170 size_t i;
1171 virNetClientProgram *prog = NULL;
1172
1173 for (i = 0; i < client->nprograms; i++) {
1174 if (virNetClientProgramMatches(client->programs[i],
1175 &client->msg)) {
1176 prog = client->programs[i];
1177 break;
1178 }
1179 }
1180 if (!prog) {
1181 VIR_DEBUG("No program found for event with prog=%d vers=%d",
1182 client->msg.header.prog, client->msg.header.vers);
1183 return -1;
1184 }
1185
1186 virNetClientProgramDispatch(prog, client, &client->msg);
1187
1188 return 0;
1189 }
1190
virNetClientCallCompleteAllWaitingReply(virNetClient * client)1191 static void virNetClientCallCompleteAllWaitingReply(virNetClient *client)
1192 {
1193 virNetClientCall *call;
1194
1195 for (call = client->waitDispatch; call; call = call->next) {
1196 if (call->msg->header.prog == client->msg.header.prog &&
1197 call->msg->header.vers == client->msg.header.vers &&
1198 call->msg->header.serial == client->msg.header.serial &&
1199 call->expectReply)
1200 call->mode = VIR_NET_CLIENT_MODE_COMPLETE;
1201 }
1202 }
1203
virNetClientCallDispatchStream(virNetClient * client)1204 static int virNetClientCallDispatchStream(virNetClient *client)
1205 {
1206 size_t i;
1207 virNetClientStream *st = NULL;
1208 virNetClientCall *thecall;
1209
1210 /* First identify what stream this packet is directed at */
1211 for (i = 0; i < client->nstreams; i++) {
1212 if (virNetClientStreamMatches(client->streams[i],
1213 &client->msg)) {
1214 st = client->streams[i];
1215 break;
1216 }
1217 }
1218 if (!st) {
1219 VIR_DEBUG("No stream found for packet with prog=%d vers=%d serial=%u proc=%u",
1220 client->msg.header.prog, client->msg.header.vers,
1221 client->msg.header.serial, client->msg.header.proc);
1222 /* Don't return -1, because we expect to see further stream packets
1223 * after we've shut it down sometimes */
1224 return 0;
1225 }
1226
1227
1228 /* Status is either
1229 * - VIR_NET_OK - no payload for streams
1230 * - VIR_NET_ERROR - followed by a remote_error struct
1231 * - VIR_NET_CONTINUE - followed by a raw data packet
1232 */
1233 switch (client->msg.header.status) {
1234 case VIR_NET_CONTINUE: {
1235 if (virNetClientStreamQueuePacket(st, &client->msg) < 0)
1236 return -1;
1237
1238 /* Find oldest dummy message waiting for incoming data. */
1239 for (thecall = client->waitDispatch; thecall; thecall = thecall->next) {
1240 if (thecall->msg->header.prog == client->msg.header.prog &&
1241 thecall->msg->header.vers == client->msg.header.vers &&
1242 thecall->msg->header.serial == client->msg.header.serial &&
1243 thecall->expectReply &&
1244 thecall->msg->header.status == VIR_NET_CONTINUE)
1245 break;
1246 }
1247
1248 if (thecall) {
1249 VIR_DEBUG("Got a new incoming stream data");
1250 thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
1251 }
1252 return 0;
1253 }
1254
1255 case VIR_NET_OK:
1256 /* Find oldest abort/finish message. */
1257 for (thecall = client->waitDispatch; thecall; thecall = thecall->next) {
1258 if (thecall->msg->header.prog == client->msg.header.prog &&
1259 thecall->msg->header.vers == client->msg.header.vers &&
1260 thecall->msg->header.serial == client->msg.header.serial &&
1261 thecall->expectReply &&
1262 thecall->msg->header.status != VIR_NET_CONTINUE)
1263 break;
1264 }
1265
1266 if (!thecall) {
1267 VIR_DEBUG("Got unexpected async stream finish confirmation");
1268 return -1;
1269 }
1270
1271 VIR_DEBUG("Got a synchronous abort/finish confirm");
1272
1273 virNetClientStreamSetClosed(st,
1274 thecall->msg->header.status == VIR_NET_OK ?
1275 VIR_NET_CLIENT_STREAM_CLOSED_FINISHED :
1276 VIR_NET_CLIENT_STREAM_CLOSED_ABORTED);
1277
1278 virNetClientCallCompleteAllWaitingReply(client);
1279 return 0;
1280
1281 case VIR_NET_ERROR:
1282 /* No call, so queue the error against the stream */
1283 if (virNetClientStreamSetError(st, &client->msg) < 0)
1284 return -1;
1285
1286 virNetClientCallCompleteAllWaitingReply(client);
1287 return 0;
1288
1289 default:
1290 VIR_WARN("Stream with unexpected serial=%d, proc=%d, status=%d",
1291 client->msg.header.serial, client->msg.header.proc,
1292 client->msg.header.status);
1293 return -1;
1294 }
1295
1296 return 0;
1297 }
1298
1299
1300 static int
virNetClientCallDispatch(virNetClient * client)1301 virNetClientCallDispatch(virNetClient *client)
1302 {
1303 virNetMessage *response = NULL;
1304
1305 PROBE(RPC_CLIENT_MSG_RX,
1306 "client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
1307 client, client->msg.bufferLength,
1308 client->msg.header.prog, client->msg.header.vers, client->msg.header.proc,
1309 client->msg.header.type, client->msg.header.status, client->msg.header.serial);
1310
1311 if (virKeepAliveCheckMessage(client->keepalive, &client->msg, &response)) {
1312 if (response &&
1313 virNetClientQueueNonBlocking(client, response) < 0) {
1314 VIR_WARN("Could not queue keepalive response");
1315 virNetMessageFree(response);
1316 }
1317 return 0;
1318 }
1319
1320 switch (client->msg.header.type) {
1321 case VIR_NET_REPLY: /* Normal RPC replies */
1322 case VIR_NET_REPLY_WITH_FDS: /* Normal RPC replies with FDs */
1323 return virNetClientCallDispatchReply(client);
1324
1325 case VIR_NET_MESSAGE: /* Async notifications */
1326 return virNetClientCallDispatchMessage(client);
1327
1328 case VIR_NET_STREAM: /* Stream protocol */
1329 case VIR_NET_STREAM_HOLE: /* Sparse stream protocol */
1330 return virNetClientCallDispatchStream(client);
1331
1332 case VIR_NET_CALL:
1333 case VIR_NET_CALL_WITH_FDS:
1334 default:
1335 virReportError(VIR_ERR_RPC,
1336 _("got unexpected RPC call prog %d vers %d proc %d type %d"),
1337 client->msg.header.prog, client->msg.header.vers,
1338 client->msg.header.proc, client->msg.header.type);
1339 return -1;
1340 }
1341 }
1342
1343
1344 static ssize_t
virNetClientIOWriteMessage(virNetClient * client,virNetClientCall * thecall)1345 virNetClientIOWriteMessage(virNetClient *client,
1346 virNetClientCall *thecall)
1347 {
1348 ssize_t ret = 0;
1349
1350 if (thecall->msg->bufferOffset < thecall->msg->bufferLength) {
1351 ret = virNetSocketWrite(client->sock,
1352 thecall->msg->buffer + thecall->msg->bufferOffset,
1353 thecall->msg->bufferLength - thecall->msg->bufferOffset);
1354 if (ret <= 0)
1355 return ret;
1356
1357 thecall->msg->bufferOffset += ret;
1358 }
1359
1360 if (thecall->msg->bufferOffset == thecall->msg->bufferLength) {
1361 size_t i;
1362 for (i = thecall->msg->donefds; i < thecall->msg->nfds; i++) {
1363 int rv;
1364 if ((rv = virNetSocketSendFD(client->sock, thecall->msg->fds[i])) < 0)
1365 return -1;
1366 if (rv == 0) /* Blocking */
1367 return 0;
1368 thecall->msg->donefds++;
1369 }
1370 virNetMessageClearPayload(thecall->msg);
1371 if (thecall->expectReply)
1372 thecall->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
1373 else
1374 thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
1375 }
1376
1377 return ret;
1378 }
1379
1380
1381 static ssize_t
virNetClientIOHandleOutput(virNetClient * client)1382 virNetClientIOHandleOutput(virNetClient *client)
1383 {
1384 virNetClientCall *thecall = client->waitDispatch;
1385
1386 while (thecall &&
1387 thecall->mode != VIR_NET_CLIENT_MODE_WAIT_TX)
1388 thecall = thecall->next;
1389
1390 if (!thecall)
1391 return 0; /* This can happen if another thread raced with us and
1392 * completed the call between the time this thread woke
1393 * up from poll()ing and the time we locked the client
1394 */
1395
1396 while (thecall) {
1397 ssize_t ret = virNetClientIOWriteMessage(client, thecall);
1398 if (ret < 0)
1399 return ret;
1400
1401 if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
1402 return 0; /* Blocking write, to back to event loop */
1403
1404 thecall = thecall->next;
1405 }
1406
1407 return 0; /* No more calls to send, all done */
1408 }
1409
1410 static ssize_t
virNetClientIOReadMessage(virNetClient * client)1411 virNetClientIOReadMessage(virNetClient *client)
1412 {
1413 size_t wantData;
1414 ssize_t ret;
1415
1416 /* Start by reading length word */
1417 if (client->msg.bufferLength == 0) {
1418 client->msg.bufferLength = 4;
1419 client->msg.buffer = g_new0(char, client->msg.bufferLength);
1420 }
1421
1422 wantData = client->msg.bufferLength - client->msg.bufferOffset;
1423
1424 ret = virNetSocketRead(client->sock,
1425 client->msg.buffer + client->msg.bufferOffset,
1426 wantData);
1427 if (ret <= 0)
1428 return ret;
1429
1430 client->msg.bufferOffset += ret;
1431
1432 return ret;
1433 }
1434
1435
1436 static ssize_t
virNetClientIOHandleInput(virNetClient * client)1437 virNetClientIOHandleInput(virNetClient *client)
1438 {
1439 /* Read as much data as is available, until we get
1440 * EAGAIN
1441 */
1442 for (;;) {
1443 ssize_t ret;
1444
1445 if (client->msg.nfds == 0) {
1446 ret = virNetClientIOReadMessage(client);
1447
1448 if (ret < 0)
1449 return -1;
1450 if (ret == 0)
1451 return 0; /* Blocking on read */
1452 }
1453
1454 /* Check for completion of our goal */
1455 if (client->msg.bufferOffset == client->msg.bufferLength) {
1456 if (client->msg.bufferOffset == 4) {
1457 ret = virNetMessageDecodeLength(&client->msg);
1458 if (ret < 0)
1459 return -1;
1460
1461 /*
1462 * We'll carry on around the loop to immediately
1463 * process the message body, because it has probably
1464 * already arrived. Worst case, we'll get EAGAIN on
1465 * next iteration.
1466 */
1467 } else {
1468 if (virNetMessageDecodeHeader(&client->msg) < 0)
1469 return -1;
1470
1471 if (client->msg.header.type == VIR_NET_REPLY_WITH_FDS) {
1472 size_t i;
1473
1474 if (virNetMessageDecodeNumFDs(&client->msg) < 0)
1475 return -1;
1476
1477 for (i = client->msg.donefds; i < client->msg.nfds; i++) {
1478 int rv;
1479 if ((rv = virNetSocketRecvFD(client->sock, &(client->msg.fds[i]))) < 0)
1480 return -1;
1481 if (rv == 0) /* Blocking */
1482 break;
1483 client->msg.donefds++;
1484 }
1485
1486 if (client->msg.donefds < client->msg.nfds) {
1487 /* Because DecodeHeader/NumFDs reset bufferOffset, we
1488 * put it back to what it was, so everything works
1489 * again next time we run this method
1490 */
1491 client->msg.bufferOffset = client->msg.bufferLength;
1492 return 0; /* Blocking on more fds */
1493 }
1494 }
1495
1496 ret = virNetClientCallDispatch(client);
1497 virNetMessageClear(&client->msg);
1498 /*
1499 * We've completed one call, but we don't want to
1500 * spin around the loop forever if there are many
1501 * incoming async events, or replies for other
1502 * thread's RPC calls. We want to get out & let
1503 * any other thread take over as soon as we've
1504 * got our reply. When SASL is active though, we
1505 * may have read more data off the wire than we
1506 * initially wanted & cached it in memory. In this
1507 * case, poll() would not detect that there is more
1508 * ready todo.
1509 *
1510 * So if SASL is active *and* some SASL data is
1511 * already cached, then we'll process that now,
1512 * before returning.
1513 */
1514 if (ret == 0 &&
1515 virNetSocketHasCachedData(client->sock))
1516 continue;
1517 return ret;
1518 }
1519 }
1520 }
1521 }
1522
1523
virNetClientIOEventLoopPollEvents(virNetClientCall * call,void * opaque)1524 static bool virNetClientIOEventLoopPollEvents(virNetClientCall *call,
1525 void *opaque)
1526 {
1527 GIOCondition *ev = opaque;
1528
1529 if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
1530 *ev |= G_IO_IN;
1531 if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
1532 *ev |= G_IO_OUT;
1533
1534 return false;
1535 }
1536
1537
virNetClientIOEventLoopRemoveDone(virNetClientCall * call,void * opaque)1538 static bool virNetClientIOEventLoopRemoveDone(virNetClientCall *call,
1539 void *opaque)
1540 {
1541 virNetClientCall *thiscall = opaque;
1542
1543 if (call == thiscall)
1544 return false;
1545
1546 if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE)
1547 return false;
1548
1549 /*
1550 * ...if the call being removed from the list
1551 * still has a thread, then wake that thread up,
1552 * otherwise free the call. The latter should
1553 * only happen for calls without replies.
1554 *
1555 * ...the threads won't actually wakeup until
1556 * we release our mutex a short while
1557 * later...
1558 */
1559 if (call->haveThread) {
1560 VIR_DEBUG("Waking up sleep %p", call);
1561 virCondSignal(&call->cond);
1562 } else {
1563 VIR_DEBUG("Removing completed call %p", call);
1564 if (call->expectReply)
1565 VIR_WARN("Got a call expecting a reply but without a waiting thread");
1566 virCondDestroy(&call->cond);
1567 virNetMessageFree(call->msg);
1568 VIR_FREE(call);
1569 }
1570
1571 return true;
1572 }
1573
1574
1575 static void
virNetClientIODetachNonBlocking(virNetClientCall * call)1576 virNetClientIODetachNonBlocking(virNetClientCall *call)
1577 {
1578 VIR_DEBUG("Keeping unfinished non-blocking call %p in the queue", call);
1579 call->haveThread = false;
1580 }
1581
1582
1583 static bool
virNetClientIOEventLoopRemoveAll(virNetClientCall * call,void * opaque)1584 virNetClientIOEventLoopRemoveAll(virNetClientCall *call,
1585 void *opaque)
1586 {
1587 virNetClientCall *thiscall = opaque;
1588
1589 if (call == thiscall)
1590 return false;
1591
1592 VIR_DEBUG("Removing call %p", call);
1593 virCondDestroy(&call->cond);
1594 virNetMessageFree(call->msg);
1595 VIR_FREE(call);
1596 return true;
1597 }
1598
1599
1600 static void
virNetClientIOEventLoopPassTheBuck(virNetClient * client,virNetClientCall * thiscall)1601 virNetClientIOEventLoopPassTheBuck(virNetClient *client,
1602 virNetClientCall *thiscall)
1603 {
1604 virNetClientCall *tmp = client->waitDispatch;
1605
1606 VIR_DEBUG("Giving up the buck %p", thiscall);
1607
1608 /* See if someone else is still waiting
1609 * and if so, then pass the buck ! */
1610 while (tmp) {
1611 if (tmp != thiscall && tmp->haveThread) {
1612 VIR_DEBUG("Passing the buck to %p", tmp);
1613 virCondSignal(&tmp->cond);
1614 return;
1615 }
1616 tmp = tmp->next;
1617 }
1618 client->haveTheBuck = false;
1619
1620 VIR_DEBUG("No thread to pass the buck to");
1621 if (client->wantClose) {
1622 virNetClientCloseLocked(client);
1623 virNetClientCallRemovePredicate(&client->waitDispatch,
1624 virNetClientIOEventLoopRemoveAll,
1625 thiscall);
1626 }
1627 }
1628
1629
1630 struct virNetClientIOEventData {
1631 virNetClient *client;
1632 GIOCondition rev;
1633 };
1634
1635 static gboolean
virNetClientIOEventFD(int fd G_GNUC_UNUSED,GIOCondition ev,gpointer opaque)1636 virNetClientIOEventFD(int fd G_GNUC_UNUSED,
1637 GIOCondition ev,
1638 gpointer opaque)
1639 {
1640 struct virNetClientIOEventData *data = opaque;
1641 data->rev = ev;
1642 g_main_loop_quit(data->client->eventLoop);
1643 return G_SOURCE_REMOVE;
1644 }
1645
1646
1647 /*
1648 * Process all calls pending dispatch/receive until we
1649 * get a reply to our own call. Then quit and pass the buck
1650 * to someone else.
1651 *
1652 * Returns 1 if the call was queued and will be completed later (only
1653 * for nonBlock == true), 0 if the call was completed and -1 on error.
1654 */
virNetClientIOEventLoop(virNetClient * client,virNetClientCall * thiscall)1655 static int virNetClientIOEventLoop(virNetClient *client,
1656 virNetClientCall *thiscall)
1657 {
1658 bool error = false;
1659 int closeReason;
1660
1661 for (;;) {
1662 #ifndef WIN32
1663 sigset_t oldmask, blockedsigs;
1664 #endif /* !WIN32 */
1665 int timeout = -1;
1666 virNetMessage *msg = NULL;
1667 g_autoptr(GSource) G_GNUC_UNUSED source = NULL;
1668 GIOCondition ev = 0;
1669 struct virNetClientIOEventData data = {
1670 .client = client,
1671 .rev = 0,
1672 };
1673
1674 /* If we have existing SASL decoded data we don't want to sleep in
1675 * the poll(), just check if any other FDs are also ready.
1676 * If the connection is going to be closed, we don't want to sleep in
1677 * poll() either.
1678 */
1679 if (virNetSocketHasCachedData(client->sock) || client->wantClose)
1680 timeout = 0;
1681
1682 /* If we are non-blocking, then we don't want to sleep in poll() */
1683 if (thiscall->nonBlock)
1684 timeout = 0;
1685
1686 /* Limit timeout so that we can send keepalive request in time */
1687 if (timeout == -1)
1688 timeout = virKeepAliveTimeout(client->keepalive);
1689
1690 /* Calculate poll events for calls */
1691 virNetClientCallMatchPredicate(client->waitDispatch,
1692 virNetClientIOEventLoopPollEvents,
1693 &ev);
1694
1695 /* We have to be prepared to receive stream data
1696 * regardless of whether any of the calls waiting
1697 * for dispatch are for streams.
1698 */
1699 if (client->nstreams)
1700 ev |= G_IO_IN;
1701
1702 source = virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
1703 ev,
1704 client->eventCtx,
1705 virNetClientIOEventFD, &data, NULL);
1706
1707 /* Release lock while poll'ing so other threads
1708 * can stuff themselves on the queue */
1709 virObjectUnlock(client);
1710
1711 #ifndef WIN32
1712 /* Block SIGWINCH from interrupting poll in curses programs,
1713 * then restore the original signal mask again immediately
1714 * after the call (RHBZ#567931). Same for SIGCHLD and SIGPIPE
1715 * at the suggestion of Paolo Bonzini and Daniel Berrange.
1716 */
1717 sigemptyset(&blockedsigs);
1718 # ifdef SIGWINCH
1719 sigaddset(&blockedsigs, SIGWINCH);
1720 # endif
1721 # ifdef SIGCHLD
1722 sigaddset(&blockedsigs, SIGCHLD);
1723 # endif
1724 sigaddset(&blockedsigs, SIGPIPE);
1725
1726 ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
1727 #endif /* !WIN32 */
1728
1729 g_main_loop_run(client->eventLoop);
1730
1731 #ifndef WIN32
1732 ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
1733 #endif /* !WIN32 */
1734
1735 virObjectLock(client);
1736
1737 if (virKeepAliveTrigger(client->keepalive, &msg)) {
1738 virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_KEEPALIVE);
1739 } else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) {
1740 VIR_WARN("Could not queue keepalive request");
1741 virNetMessageFree(msg);
1742 }
1743
1744 /* If we have existing SASL decoded data, pretend
1745 * the socket became readable so we consume it
1746 */
1747 if (virNetSocketHasCachedData(client->sock))
1748 data.rev |= G_IO_IN;
1749
1750 /* If wantClose flag is set, pretend there was an error on the socket,
1751 * but still read and process any data we received so far.
1752 */
1753 if (client->wantClose)
1754 error = true;
1755
1756 if (data.rev & G_IO_HUP)
1757 closeReason = VIR_CONNECT_CLOSE_REASON_EOF;
1758 else
1759 closeReason = VIR_CONNECT_CLOSE_REASON_ERROR;
1760
1761 if (data.rev & G_IO_OUT) {
1762 if (virNetClientIOHandleOutput(client) < 0) {
1763 virNetClientMarkClose(client, closeReason);
1764 error = true;
1765 /* Fall through to process any pending data. */
1766 }
1767 }
1768
1769 if (data.rev & G_IO_IN) {
1770 if (virNetClientIOHandleInput(client) < 0) {
1771 virNetClientMarkClose(client, closeReason);
1772 error = true;
1773 /* Fall through to process any pending data. */
1774 }
1775 }
1776
1777 /* Iterate through waiting calls and if any are
1778 * complete, remove them from the dispatch list.
1779 */
1780 virNetClientCallRemovePredicate(&client->waitDispatch,
1781 virNetClientIOEventLoopRemoveDone,
1782 thiscall);
1783
1784 /* Now see if *we* are done */
1785 if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
1786 virNetClientCallRemove(&client->waitDispatch, thiscall);
1787 virNetClientIOEventLoopPassTheBuck(client, thiscall);
1788 return 0;
1789 }
1790
1791 /* We're not done, but we're non-blocking; keep the call queued */
1792 if (thiscall->nonBlock) {
1793 virNetClientIODetachNonBlocking(thiscall);
1794 virNetClientIOEventLoopPassTheBuck(client, thiscall);
1795 return 1;
1796 }
1797
1798 if (error)
1799 goto error;
1800
1801 if (data.rev & G_IO_HUP) {
1802 virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
1803 _("received hangup event on socket"));
1804 virNetClientMarkClose(client, closeReason);
1805 goto error;
1806 }
1807 if (data.rev & G_IO_ERR) {
1808 virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
1809 _("received error event on socket"));
1810 virNetClientMarkClose(client, closeReason);
1811 goto error;
1812 }
1813 }
1814
1815 error:
1816 if (client->error) {
1817 VIR_DEBUG("error on socket: %s", client->error->message);
1818 virSetError(client->error);
1819 }
1820 virNetClientCallRemove(&client->waitDispatch, thiscall);
1821 virNetClientIOEventLoopPassTheBuck(client, thiscall);
1822 return -1;
1823 }
1824
1825
1826 static bool
virNetClientIOUpdateEvents(virNetClientCall * call,void * opaque)1827 virNetClientIOUpdateEvents(virNetClientCall *call,
1828 void *opaque)
1829 {
1830 int *events = opaque;
1831
1832 if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
1833 *events |= VIR_EVENT_HANDLE_WRITABLE;
1834
1835 return false;
1836 }
1837
1838
virNetClientIOUpdateCallback(virNetClient * client,bool enableCallback)1839 static void virNetClientIOUpdateCallback(virNetClient *client,
1840 bool enableCallback)
1841 {
1842 int events = 0;
1843
1844 if (client->wantClose)
1845 return;
1846
1847 if (enableCallback) {
1848 events |= VIR_EVENT_HANDLE_READABLE;
1849 virNetClientCallMatchPredicate(client->waitDispatch,
1850 virNetClientIOUpdateEvents,
1851 &events);
1852 }
1853
1854 virNetSocketUpdateIOCallback(client->sock, events);
1855 }
1856
1857
1858 /*
1859 * This function sends a message to remote server and awaits a reply
1860 *
1861 * NB. This does not free the args structure (not desirable, since you
1862 * often want this allocated on the stack or else it contains strings
1863 * which come from the user). It does however free any intermediate
1864 * results, eg. the error structure if there is one.
1865 *
1866 * NB(2). Make sure to memset (&ret, 0, sizeof(ret)) before calling,
1867 * else Bad Things will happen in the XDR code.
1868 *
1869 * NB(3) You must have the client lock before calling this
1870 *
1871 * NB(4) This is very complicated. Multiple threads are allowed to
1872 * use the client for RPC at the same time. Obviously only one of
1873 * them can. So if someone's using the socket, other threads are put
1874 * to sleep on condition variables. The existing thread may completely
1875 * send & receive their RPC call/reply while they're asleep. Or it
1876 * may only get around to dealing with sending the call. Or it may
1877 * get around to neither. So upon waking up from slumber, the other
1878 * thread may or may not have more work todo.
1879 *
1880 * We call this dance 'passing the buck'
1881 *
1882 * https://en.wikipedia.org/wiki/Passing_the_buck
1883 *
1884 * "Buck passing or passing the buck is the action of transferring
1885 * responsibility or blame unto another person. It is also used as
1886 * a strategy in power politics when the actions of one country/
1887 * nation are blamed on another, providing an opportunity for war."
1888 *
1889 * NB(5) If the 'thiscall' has the 'nonBlock' flag set, the caller
1890 * must *NOT* free it, if this returns '1' (ie partial send).
1891 *
1892 * NB(6) The following input states are valid if *no* threads
1893 * are currently executing this method
1894 *
1895 * - waitDispatch == NULL,
1896 * - waitDispatch != NULL, waitDispatch.nonBlock == true
1897 *
1898 * The following input states are valid, if n threads are currently
1899 * executing
1900 *
1901 * - waitDispatch != NULL
1902 * - 0 or 1 waitDispatch.nonBlock == false, without any threads
1903 * - 0 or more waitDispatch.nonBlock == false, with threads
1904 *
1905 * The following output states are valid when all threads are done
1906 *
1907 * - waitDispatch == NULL,
1908 * - waitDispatch != NULL, waitDispatch.nonBlock == true
1909 *
1910 * NB(7) Don't Panic!
1911 *
1912 * Returns 1 if the call was queued and will be completed later (only
1913 * for nonBlock == true), 0 if the call was completed and -1 on error.
1914 */
virNetClientIO(virNetClient * client,virNetClientCall * thiscall)1915 static int virNetClientIO(virNetClient *client,
1916 virNetClientCall *thiscall)
1917 {
1918 int rv = -1;
1919
1920 VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p",
1921 thiscall->msg->header.prog,
1922 thiscall->msg->header.vers,
1923 thiscall->msg->header.serial,
1924 thiscall->msg->header.proc,
1925 thiscall->msg->header.type,
1926 thiscall->msg->bufferLength,
1927 client->waitDispatch);
1928
1929 /* Stick ourselves on the end of the wait queue */
1930 virNetClientCallQueue(&client->waitDispatch, thiscall);
1931
1932 /* Check to see if another thread is dispatching */
1933 if (client->haveTheBuck) {
1934 /* Force other thread to wakeup from poll */
1935 g_main_loop_quit(client->eventLoop);
1936
1937 /* If we are non-blocking, detach the thread and keep the call in the
1938 * queue. */
1939 if (thiscall->nonBlock) {
1940 virNetClientIODetachNonBlocking(thiscall);
1941 rv = 1;
1942 goto cleanup;
1943 }
1944
1945 VIR_DEBUG("Going to sleep head=%p call=%p",
1946 client->waitDispatch, thiscall);
1947 /* Go to sleep while other thread is working... */
1948 if (virCondWait(&thiscall->cond, &client->parent.lock) < 0) {
1949 virNetClientCallRemove(&client->waitDispatch, thiscall);
1950 virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
1951 _("failed to wait on condition"));
1952 return -1;
1953 }
1954
1955 VIR_DEBUG("Woken up from sleep head=%p call=%p",
1956 client->waitDispatch, thiscall);
1957 /* Three reasons we can be woken up
1958 * 1. Other thread has got our reply ready for us
1959 * 2. Other thread is all done, and it is our turn to
1960 * be the dispatcher to finish waiting for
1961 * our reply
1962 */
1963 if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
1964 rv = 0;
1965 /*
1966 * We avoided catching the buck and our reply is ready !
1967 * We've already had 'thiscall' removed from the list
1968 * so just need to (maybe) handle errors & free it
1969 */
1970 goto cleanup;
1971 }
1972
1973 /* Grr, someone passed the buck to us ... */
1974 } else {
1975 client->haveTheBuck = true;
1976 }
1977
1978 VIR_DEBUG("We have the buck head=%p call=%p",
1979 client->waitDispatch, thiscall);
1980
1981 /*
1982 * The buck stops here!
1983 *
1984 * At this point we're about to own the dispatch
1985 * process...
1986 */
1987
1988 /*
1989 * Avoid needless wake-ups of the event loop in the
1990 * case where this call is being made from a different
1991 * thread than the event loop. These wake-ups would
1992 * cause the event loop thread to be blocked on the
1993 * mutex for the duration of the call
1994 */
1995 virNetClientIOUpdateCallback(client, false);
1996
1997 rv = virNetClientIOEventLoop(client, thiscall);
1998
1999 if (client->sock)
2000 virNetClientIOUpdateCallback(client, true);
2001
2002 cleanup:
2003 VIR_DEBUG("All done with our call head=%p call=%p rv=%d",
2004 client->waitDispatch, thiscall, rv);
2005 return rv;
2006 }
2007
2008
virNetClientIncomingEvent(virNetSocket * sock,int events,void * opaque)2009 void virNetClientIncomingEvent(virNetSocket *sock,
2010 int events,
2011 void *opaque)
2012 {
2013 virNetClient *client = opaque;
2014 int closeReason;
2015
2016 virObjectLock(client);
2017
2018 VIR_DEBUG("client=%p wantclose=%d", client, client ? client->wantClose : false);
2019
2020 if (!client->sock)
2021 goto done;
2022
2023 if (client->haveTheBuck || client->wantClose)
2024 goto done;
2025
2026 VIR_DEBUG("Event fired %p %d", sock, events);
2027
2028 if (events & VIR_EVENT_HANDLE_HANGUP)
2029 closeReason = VIR_CONNECT_CLOSE_REASON_EOF;
2030 else
2031 closeReason = VIR_CONNECT_CLOSE_REASON_ERROR;
2032
2033 if (events & VIR_EVENT_HANDLE_WRITABLE) {
2034 if (virNetClientIOHandleOutput(client) < 0)
2035 virNetClientMarkClose(client, closeReason);
2036 }
2037
2038 if (events & VIR_EVENT_HANDLE_READABLE) {
2039 if (virNetClientIOHandleInput(client) < 0)
2040 virNetClientMarkClose(client, closeReason);
2041 }
2042
2043 if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
2044 VIR_DEBUG("VIR_EVENT_HANDLE_HANGUP or "
2045 "VIR_EVENT_HANDLE_ERROR encountered");
2046 virNetClientMarkClose(client, closeReason);
2047 goto done;
2048 }
2049
2050 /* Remove completed calls or signal their threads. */
2051 virNetClientCallRemovePredicate(&client->waitDispatch,
2052 virNetClientIOEventLoopRemoveDone,
2053 NULL);
2054 virNetClientIOUpdateCallback(client, true);
2055
2056 done:
2057 if (client->wantClose && !client->haveTheBuck) {
2058 virNetClientCloseLocked(client);
2059 virNetClientCallRemovePredicate(&client->waitDispatch,
2060 virNetClientIOEventLoopRemoveAll,
2061 NULL);
2062 }
2063 virObjectUnlock(client);
2064 }
2065
2066
2067 static virNetClientCall *
virNetClientCallNew(virNetMessage * msg,bool expectReply,bool nonBlock)2068 virNetClientCallNew(virNetMessage *msg,
2069 bool expectReply,
2070 bool nonBlock)
2071 {
2072 virNetClientCall *call = NULL;
2073
2074 if (expectReply &&
2075 (msg->bufferLength != 0) &&
2076 (msg->header.status == VIR_NET_CONTINUE)) {
2077 virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
2078 _("Attempt to send an asynchronous message with"
2079 " a synchronous reply"));
2080 goto error;
2081 }
2082
2083 if (expectReply && nonBlock) {
2084 virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
2085 _("Attempt to send a non-blocking message with"
2086 " a synchronous reply"));
2087 goto error;
2088 }
2089
2090 call = g_new0(virNetClientCall, 1);
2091
2092 if (virCondInit(&call->cond) < 0) {
2093 virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
2094 _("cannot initialize condition variable"));
2095 goto error;
2096 }
2097
2098 msg->donefds = 0;
2099 if (msg->bufferLength)
2100 call->mode = VIR_NET_CLIENT_MODE_WAIT_TX;
2101 else
2102 call->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
2103 call->msg = msg;
2104 call->expectReply = expectReply;
2105 call->nonBlock = nonBlock;
2106
2107 VIR_DEBUG("New call %p: msg=%p, expectReply=%d, nonBlock=%d",
2108 call, msg, expectReply, nonBlock);
2109
2110 return call;
2111
2112 error:
2113 VIR_FREE(call);
2114 return NULL;
2115 }
2116
2117
2118 static int
virNetClientQueueNonBlocking(virNetClient * client,virNetMessage * msg)2119 virNetClientQueueNonBlocking(virNetClient *client,
2120 virNetMessage *msg)
2121 {
2122 virNetClientCall *call;
2123
2124 PROBE(RPC_CLIENT_MSG_TX_QUEUE,
2125 "client=%p len=%zu prog=%u vers=%u proc=%u"
2126 " type=%u status=%u serial=%u",
2127 client, msg->bufferLength,
2128 msg->header.prog, msg->header.vers, msg->header.proc,
2129 msg->header.type, msg->header.status, msg->header.serial);
2130
2131 if (!(call = virNetClientCallNew(msg, false, true)))
2132 return -1;
2133
2134 virNetClientCallQueue(&client->waitDispatch, call);
2135 return 0;
2136 }
2137
2138
2139 /*
2140 * Returns 1 if the call was queued and will be completed later (only
2141 * for nonBlock == true), 0 if the call was completed and -1 on error.
2142 */
virNetClientSendInternal(virNetClient * client,virNetMessage * msg,bool expectReply,bool nonBlock)2143 static int virNetClientSendInternal(virNetClient *client,
2144 virNetMessage *msg,
2145 bool expectReply,
2146 bool nonBlock)
2147 {
2148 virNetClientCall *call;
2149 int ret = -1;
2150
2151 PROBE(RPC_CLIENT_MSG_TX_QUEUE,
2152 "client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
2153 client, msg->bufferLength,
2154 msg->header.prog, msg->header.vers, msg->header.proc,
2155 msg->header.type, msg->header.status, msg->header.serial);
2156
2157 if (!client->sock || client->wantClose) {
2158 virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
2159 _("client socket is closed"));
2160 return -1;
2161 }
2162
2163 if (!(call = virNetClientCallNew(msg, expectReply, nonBlock)))
2164 return -1;
2165
2166 call->haveThread = true;
2167 ret = virNetClientIO(client, call);
2168
2169 /* If queued, the call will be finished and freed later by another thread;
2170 * we're done. */
2171 if (ret == 1)
2172 return 1;
2173
2174 virCondDestroy(&call->cond);
2175 VIR_FREE(call);
2176 return ret;
2177 }
2178
2179
2180 /*
2181 * @msg: a message allocated on heap or stack
2182 *
2183 * Send a message synchronously, and wait for the reply synchronously
2184 *
2185 * The caller is responsible for free'ing @msg if it was allocated
2186 * on the heap
2187 *
2188 * Returns 0 on success, -1 on failure
2189 */
virNetClientSendWithReply(virNetClient * client,virNetMessage * msg)2190 int virNetClientSendWithReply(virNetClient *client,
2191 virNetMessage *msg)
2192 {
2193 int ret;
2194 virObjectLock(client);
2195 ret = virNetClientSendInternal(client, msg, true, false);
2196 virObjectUnlock(client);
2197 if (ret < 0)
2198 return -1;
2199 return 0;
2200 }
2201
2202
2203 /*
2204 * @msg: a message allocated on the heap.
2205 *
2206 * Send a message asynchronously, without any reply
2207 *
2208 * The caller is responsible for free'ing @msg, *except* if
2209 * this method returns 1.
2210 *
2211 * Returns 1 if the message was queued and will be completed later (only
2212 * for nonBlock == true), 0 if the message was completed and -1 on error.
2213 */
virNetClientSendNonBlock(virNetClient * client,virNetMessage * msg)2214 int virNetClientSendNonBlock(virNetClient *client,
2215 virNetMessage *msg)
2216 {
2217 int ret;
2218 virObjectLock(client);
2219 ret = virNetClientSendInternal(client, msg, false, true);
2220 virObjectUnlock(client);
2221 return ret;
2222 }
2223
2224 /*
2225 * @msg: a message allocated on heap or stack
2226 *
2227 * Send a message synchronously, and wait for the reply synchronously if
2228 * message is dummy (just to wait for incoming data) or abort/finish message.
2229 *
2230 * The caller is responsible for free'ing @msg if it was allocated
2231 * on the heap
2232 *
2233 * Returns 0 on success, -1 on failure
2234 */
virNetClientSendStream(virNetClient * client,virNetMessage * msg,virNetClientStream * st)2235 int virNetClientSendStream(virNetClient *client,
2236 virNetMessage *msg,
2237 virNetClientStream *st)
2238 {
2239 int ret = -1;
2240 bool expectReply = !msg->bufferLength ||
2241 msg->header.status != VIR_NET_CONTINUE;
2242
2243 virObjectLock(client);
2244
2245 if (virNetClientStreamCheckState(st) < 0)
2246 goto cleanup;
2247
2248 /* Check for EOF only if we are going to wait for incoming data */
2249 if (!msg->bufferLength && virNetClientStreamEOF(st)) {
2250 ret = 0;
2251 goto cleanup;
2252 }
2253
2254 if (virNetClientSendInternal(client, msg, expectReply, false) < 0)
2255 goto cleanup;
2256
2257 if (expectReply && virNetClientStreamCheckSendStatus(st, msg) < 0)
2258 goto cleanup;
2259
2260 ret = 0;
2261
2262 cleanup:
2263 virObjectUnlock(client);
2264
2265 return ret;
2266 }
2267