1 /*
2  * Copyright 2004-2021 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 
12 #include <stdio.h>
13 #include <errno.h>
14 #include <bzlib.h>
15 #include <sys/stat.h>
16 #include <sys/types.h>
17 
18 #include <crm/crm.h>
19 #include <crm/msg_xml.h>
20 #include <crm/common/ipc.h>
21 #include <crm/common/ipc_internal.h>
22 #include "crmcommon_private.h"
23 
24 /* Evict clients whose event queue grows this large (by default) */
25 #define PCMK_IPC_DEFAULT_QUEUE_MAX 500
26 
27 static GHashTable *client_connections = NULL;
28 
29 /*!
30  * \internal
31  * \brief Count IPC clients
32  *
33  * \return Number of active IPC client connections
34  */
35 guint
pcmk__ipc_client_count()36 pcmk__ipc_client_count()
37 {
38     return client_connections? g_hash_table_size(client_connections) : 0;
39 }
40 
41 /*!
42  * \internal
43  * \brief Execute a function for each active IPC client connection
44  *
45  * \param[in] func       Function to call
46  * \param[in] user_data  Pointer to pass to function
47  *
48  * \note The parameters are the same as for g_hash_table_foreach().
49  */
50 void
pcmk__foreach_ipc_client(GHFunc func,gpointer user_data)51 pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
52 {
53     if ((func != NULL) && (client_connections != NULL)) {
54         g_hash_table_foreach(client_connections, func, user_data);
55     }
56 }
57 
58 pcmk__client_t *
pcmk__find_client(qb_ipcs_connection_t * c)59 pcmk__find_client(qb_ipcs_connection_t *c)
60 {
61     if (client_connections) {
62         return g_hash_table_lookup(client_connections, c);
63     }
64 
65     crm_trace("No client found for %p", c);
66     return NULL;
67 }
68 
69 pcmk__client_t *
pcmk__find_client_by_id(const char * id)70 pcmk__find_client_by_id(const char *id)
71 {
72     gpointer key;
73     pcmk__client_t *client;
74     GHashTableIter iter;
75 
76     if (client_connections && id) {
77         g_hash_table_iter_init(&iter, client_connections);
78         while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
79             if (strcmp(client->id, id) == 0) {
80                 return client;
81             }
82         }
83     }
84 
85     crm_trace("No client found with id=%s", id);
86     return NULL;
87 }
88 
89 /*!
90  * \internal
91  * \brief Get a client identifier for use in log messages
92  *
93  * \param[in] c  Client
94  *
95  * \return Client's name, client's ID, or a string literal, as available
96  * \note This is intended to be used in format strings like "client %s".
97  */
98 const char *
pcmk__client_name(pcmk__client_t * c)99 pcmk__client_name(pcmk__client_t *c)
100 {
101     if (c == NULL) {
102         return "(unspecified)";
103 
104     } else if (c->name != NULL) {
105         return c->name;
106 
107     } else if (c->id != NULL) {
108         return c->id;
109 
110     } else {
111         return "(unidentified)";
112     }
113 }
114 
115 void
pcmk__client_cleanup(void)116 pcmk__client_cleanup(void)
117 {
118     if (client_connections != NULL) {
119         int active = g_hash_table_size(client_connections);
120 
121         if (active) {
122             crm_err("Exiting with %d active IPC client%s",
123                     active, pcmk__plural_s(active));
124         }
125         g_hash_table_destroy(client_connections); client_connections = NULL;
126     }
127 }
128 
129 void
pcmk__drop_all_clients(qb_ipcs_service_t * service)130 pcmk__drop_all_clients(qb_ipcs_service_t *service)
131 {
132     qb_ipcs_connection_t *c = NULL;
133 
134     if (service == NULL) {
135         return;
136     }
137 
138     c = qb_ipcs_connection_first_get(service);
139 
140     while (c != NULL) {
141         qb_ipcs_connection_t *last = c;
142 
143         c = qb_ipcs_connection_next_get(service, last);
144 
145         /* There really shouldn't be anyone connected at this point */
146         crm_notice("Disconnecting client %p, pid=%d...",
147                    last, pcmk__client_pid(last));
148         qb_ipcs_disconnect(last);
149         qb_ipcs_connection_unref(last);
150     }
151 }
152 
153 /*!
154  * \internal
155  * \brief Allocate a new pcmk__client_t object based on an IPC connection
156  *
157  * \param[in] c           IPC connection (or NULL to allocate generic client)
158  * \param[in] key         Connection table key (or NULL to use sane default)
159  * \param[in] uid_client  UID corresponding to c (ignored if c is NULL)
160  *
161  * \return Pointer to new pcmk__client_t (or NULL on error)
162  */
163 static pcmk__client_t *
client_from_connection(qb_ipcs_connection_t * c,void * key,uid_t uid_client)164 client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
165 {
166     pcmk__client_t *client = calloc(1, sizeof(pcmk__client_t));
167 
168     if (client == NULL) {
169         crm_perror(LOG_ERR, "Allocating client");
170         return NULL;
171     }
172 
173     if (c) {
174         client->user = pcmk__uid2username(uid_client);
175         if (client->user == NULL) {
176             client->user = strdup("#unprivileged");
177             CRM_CHECK(client->user != NULL, free(client); return NULL);
178             crm_err("Unable to enforce ACLs for user ID %d, assuming unprivileged",
179                     uid_client);
180         }
181         client->ipcs = c;
182         pcmk__set_client_flags(client, pcmk__client_ipc);
183         client->pid = pcmk__client_pid(c);
184         if (key == NULL) {
185             key = c;
186         }
187     }
188 
189     client->id = crm_generate_uuid();
190     if (client->id == NULL) {
191         crm_err("Could not generate UUID for client");
192         free(client->user);
193         free(client);
194         return NULL;
195     }
196     if (key == NULL) {
197         key = client->id;
198     }
199     if (client_connections == NULL) {
200         crm_trace("Creating IPC client table");
201         client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
202     }
203     g_hash_table_insert(client_connections, key, client);
204     return client;
205 }
206 
207 /*!
208  * \brief Allocate a new pcmk__client_t object and generate its ID
209  *
210  * \param[in] key  What to use as connections hash table key (NULL to use ID)
211  *
212  * \return Pointer to new pcmk__client_t (asserts on failure)
213  */
214 pcmk__client_t *
pcmk__new_unauth_client(void * key)215 pcmk__new_unauth_client(void *key)
216 {
217     pcmk__client_t *client = client_from_connection(NULL, key, 0);
218 
219     CRM_ASSERT(client != NULL);
220     return client;
221 }
222 
223 pcmk__client_t *
pcmk__new_client(qb_ipcs_connection_t * c,uid_t uid_client,gid_t gid_client)224 pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
225 {
226     gid_t uid_cluster = 0;
227     gid_t gid_cluster = 0;
228 
229     pcmk__client_t *client = NULL;
230 
231     CRM_CHECK(c != NULL, return NULL);
232 
233     if (pcmk_daemon_user(&uid_cluster, &gid_cluster) < 0) {
234         static bool need_log = TRUE;
235 
236         if (need_log) {
237             crm_warn("Could not find user and group IDs for user %s",
238                      CRM_DAEMON_USER);
239             need_log = FALSE;
240         }
241     }
242 
243     if (uid_client != 0) {
244         crm_trace("Giving group %u access to new IPC connection", gid_cluster);
245         /* Passing -1 to chown(2) means don't change */
246         qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
247     }
248 
249     /* TODO: Do our own auth checking, return NULL if unauthorized */
250     client = client_from_connection(c, NULL, uid_client);
251     if (client == NULL) {
252         return NULL;
253     }
254 
255     if ((uid_client == 0) || (uid_client == uid_cluster)) {
256         /* Remember when a connection came from root or hacluster */
257         pcmk__set_client_flags(client, pcmk__client_privileged);
258     }
259 
260     crm_debug("New IPC client %s for PID %u with uid %d and gid %d",
261               client->id, client->pid, uid_client, gid_client);
262     return client;
263 }
264 
265 static struct iovec *
pcmk__new_ipc_event(void)266 pcmk__new_ipc_event(void)
267 {
268     struct iovec *iov = calloc(2, sizeof(struct iovec));
269 
270     CRM_ASSERT(iov != NULL);
271     return iov;
272 }
273 
274 /*!
275  * \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
276  *
277  * \param[in] event  I/O vector to free
278  */
279 void
pcmk_free_ipc_event(struct iovec * event)280 pcmk_free_ipc_event(struct iovec *event)
281 {
282     if (event != NULL) {
283         free(event[0].iov_base);
284         free(event[1].iov_base);
285         free(event);
286     }
287 }
288 
289 static void
free_event(gpointer data)290 free_event(gpointer data)
291 {
292     pcmk_free_ipc_event((struct iovec *) data);
293 }
294 
295 static void
add_event(pcmk__client_t * c,struct iovec * iov)296 add_event(pcmk__client_t *c, struct iovec *iov)
297 {
298     if (c->event_queue == NULL) {
299         c->event_queue = g_queue_new();
300     }
301     g_queue_push_tail(c->event_queue, iov);
302 }
303 
304 void
pcmk__free_client(pcmk__client_t * c)305 pcmk__free_client(pcmk__client_t *c)
306 {
307     if (c == NULL) {
308         return;
309     }
310 
311     if (client_connections) {
312         if (c->ipcs) {
313             crm_trace("Destroying %p/%p (%d remaining)",
314                       c, c->ipcs, g_hash_table_size(client_connections) - 1);
315             g_hash_table_remove(client_connections, c->ipcs);
316 
317         } else {
318             crm_trace("Destroying remote connection %p (%d remaining)",
319                       c, g_hash_table_size(client_connections) - 1);
320             g_hash_table_remove(client_connections, c->id);
321         }
322     }
323 
324     if (c->event_timer) {
325         g_source_remove(c->event_timer);
326     }
327 
328     if (c->event_queue) {
329         crm_debug("Destroying %d events", g_queue_get_length(c->event_queue));
330         g_queue_free_full(c->event_queue, free_event);
331     }
332 
333     free(c->id);
334     free(c->name);
335     free(c->user);
336     if (c->remote) {
337         if (c->remote->auth_timeout) {
338             g_source_remove(c->remote->auth_timeout);
339         }
340         free(c->remote->buffer);
341         free(c->remote);
342     }
343     free(c);
344 }
345 
346 /*!
347  * \internal
348  * \brief Raise IPC eviction threshold for a client, if allowed
349  *
350  * \param[in,out] client     Client to modify
351  * \param[in]     qmax       New threshold (as non-NULL string)
352  *
353  * \return true if change was allowed, false otherwise
354  */
355 bool
pcmk__set_client_queue_max(pcmk__client_t * client,const char * qmax)356 pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
357 {
358     if (pcmk_is_set(client->flags, pcmk__client_privileged)) {
359         long long qmax_ll;
360 
361         if ((pcmk__scan_ll(qmax, &qmax_ll, 0LL) == pcmk_rc_ok)
362             && (qmax_ll > 0LL) && (qmax_ll <= UINT_MAX)) {
363             client->queue_max = (unsigned int) qmax_ll;
364             return true;
365         }
366     }
367     return false;
368 }
369 
370 int
pcmk__client_pid(qb_ipcs_connection_t * c)371 pcmk__client_pid(qb_ipcs_connection_t *c)
372 {
373     struct qb_ipcs_connection_stats stats;
374 
375     stats.client_pid = 0;
376     qb_ipcs_connection_stats_get(c, &stats, 0);
377     return stats.client_pid;
378 }
379 
380 /*!
381  * \internal
382  * \brief Retrieve message XML from data read from client IPC
383  *
384  * \param[in]  c       IPC client connection
385  * \param[in]  data    Data read from client connection
386  * \param[out] id      Where to store message ID from libqb header
387  * \param[out] flags   Where to store flags from libqb header
388  *
389  * \return Message XML on success, NULL otherwise
390  */
391 xmlNode *
pcmk__client_data2xml(pcmk__client_t * c,void * data,uint32_t * id,uint32_t * flags)392 pcmk__client_data2xml(pcmk__client_t *c, void *data, uint32_t *id,
393                       uint32_t *flags)
394 {
395     xmlNode *xml = NULL;
396     char *uncompressed = NULL;
397     char *text = ((char *)data) + sizeof(pcmk__ipc_header_t);
398     pcmk__ipc_header_t *header = data;
399 
400     if (!pcmk__valid_ipc_header(header)) {
401         return NULL;
402     }
403 
404     if (id) {
405         *id = ((struct qb_ipc_response_header *)data)->id;
406     }
407     if (flags) {
408         *flags = header->flags;
409     }
410 
411     if (pcmk_is_set(header->flags, crm_ipc_proxied)) {
412         /* Mark this client as being the endpoint of a proxy connection.
413          * Proxy connections responses are sent on the event channel, to avoid
414          * blocking the controller serving as proxy.
415          */
416         pcmk__set_client_flags(c, pcmk__client_proxied);
417     }
418 
419     if (header->size_compressed) {
420         int rc = 0;
421         unsigned int size_u = 1 + header->size_uncompressed;
422         uncompressed = calloc(1, size_u);
423 
424         crm_trace("Decompressing message data %u bytes into %u bytes",
425                   header->size_compressed, size_u);
426 
427         rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0);
428         text = uncompressed;
429 
430         if (rc != BZ_OK) {
431             crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
432                     bz2_strerror(rc), rc);
433             free(uncompressed);
434             return NULL;
435         }
436     }
437 
438     CRM_ASSERT(text[header->size_uncompressed - 1] == 0);
439 
440     xml = string2xml(text);
441     crm_log_xml_trace(xml, "[IPC received]");
442 
443     free(uncompressed);
444     return xml;
445 }
446 
447 static int crm_ipcs_flush_events(pcmk__client_t *c);
448 
449 static gboolean
crm_ipcs_flush_events_cb(gpointer data)450 crm_ipcs_flush_events_cb(gpointer data)
451 {
452     pcmk__client_t *c = data;
453 
454     c->event_timer = 0;
455     crm_ipcs_flush_events(c);
456     return FALSE;
457 }
458 
459 /*!
460  * \internal
461  * \brief Add progressive delay before next event queue flush
462  *
463  * \param[in,out] c          Client connection to add delay to
464  * \param[in]     queue_len  Current event queue length
465  */
466 static inline void
delay_next_flush(pcmk__client_t * c,unsigned int queue_len)467 delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
468 {
469     /* Delay a maximum of 1.5 seconds */
470     guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
471 
472     c->event_timer = g_timeout_add(delay, crm_ipcs_flush_events_cb, c);
473 }
474 
475 /*!
476  * \internal
477  * \brief Send client any messages in its queue
478  *
479  * \param[in]  c  Client to flush
480  *
481  * \return Standard Pacemaker return value
482  */
483 static int
crm_ipcs_flush_events(pcmk__client_t * c)484 crm_ipcs_flush_events(pcmk__client_t *c)
485 {
486     int rc = pcmk_rc_ok;
487     ssize_t qb_rc = 0;
488     unsigned int sent = 0;
489     unsigned int queue_len = 0;
490 
491     if (c == NULL) {
492         return rc;
493 
494     } else if (c->event_timer) {
495         /* There is already a timer, wait until it goes off */
496         crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer);
497         return rc;
498     }
499 
500     if (c->event_queue) {
501         queue_len = g_queue_get_length(c->event_queue);
502     }
503     while (sent < 100) {
504         pcmk__ipc_header_t *header = NULL;
505         struct iovec *event = NULL;
506 
507         if (c->event_queue) {
508             // We don't pop unless send is successful
509             event = g_queue_peek_head(c->event_queue);
510         }
511         if (event == NULL) { // Queue is empty
512             break;
513         }
514 
515         qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
516         if (qb_rc < 0) {
517             rc = (int) -qb_rc;
518             break;
519         }
520         event = g_queue_pop_head(c->event_queue);
521 
522         sent++;
523         header = event[0].iov_base;
524         if (header->size_compressed) {
525             crm_trace("Event %d to %p[%d] (%lld compressed bytes) sent",
526                       header->qb.id, c->ipcs, c->pid, (long long) qb_rc);
527         } else {
528             crm_trace("Event %d to %p[%d] (%lld bytes) sent: %.120s",
529                       header->qb.id, c->ipcs, c->pid, (long long) qb_rc,
530                       (char *) (event[1].iov_base));
531         }
532         pcmk_free_ipc_event(event);
533     }
534 
535     queue_len -= sent;
536     if (sent > 0 || queue_len) {
537         crm_trace("Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
538                   sent, queue_len, c->ipcs, c->pid,
539                   pcmk_rc_str(rc), (long long) qb_rc);
540     }
541 
542     if (queue_len) {
543 
544         /* Allow clients to briefly fall behind on processing incoming messages,
545          * but drop completely unresponsive clients so the connection doesn't
546          * consume resources indefinitely.
547          */
548         if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
549             if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
550                 /* Don't evict for a new or shrinking backlog */
551                 crm_warn("Client with process ID %u has a backlog of %u messages "
552                          CRM_XS " %p", c->pid, queue_len, c->ipcs);
553             } else {
554                 crm_err("Evicting client with process ID %u due to backlog of %u messages "
555                          CRM_XS " %p", c->pid, queue_len, c->ipcs);
556                 c->queue_backlog = 0;
557                 qb_ipcs_disconnect(c->ipcs);
558                 return rc;
559             }
560         }
561 
562         c->queue_backlog = queue_len;
563         delay_next_flush(c, queue_len);
564 
565     } else {
566         /* Event queue is empty, there is no backlog */
567         c->queue_backlog = 0;
568     }
569 
570     return rc;
571 }
572 
573 /*!
574  * \internal
575  * \brief Create an I/O vector for sending an IPC XML message
576  *
577  * \param[in]  request        Identifier for libqb response header
578  * \param[in]  message        XML message to send
579  * \param[in]  max_send_size  If 0, default IPC buffer size is used
580  * \param[out] result         Where to store prepared I/O vector
581  * \param[out] bytes          Size of prepared data in bytes
582  *
583  * \return Standard Pacemaker return code
584  */
585 int
pcmk__ipc_prepare_iov(uint32_t request,xmlNode * message,uint32_t max_send_size,struct iovec ** result,ssize_t * bytes)586 pcmk__ipc_prepare_iov(uint32_t request, xmlNode *message,
587                       uint32_t max_send_size, struct iovec **result,
588                       ssize_t *bytes)
589 {
590     static unsigned int biggest = 0;
591     struct iovec *iov;
592     unsigned int total = 0;
593     char *compressed = NULL;
594     char *buffer = NULL;
595     pcmk__ipc_header_t *header = NULL;
596 
597     if ((message == NULL) || (result == NULL)) {
598         return EINVAL;
599     }
600 
601     header = calloc(1, sizeof(pcmk__ipc_header_t));
602     if (header == NULL) {
603        return ENOMEM; /* errno mightn't be set by allocator */
604     }
605 
606     buffer = dump_xml_unformatted(message);
607 
608     if (max_send_size == 0) {
609         max_send_size = crm_ipc_default_buffer_size();
610     }
611     CRM_LOG_ASSERT(max_send_size != 0);
612 
613     *result = NULL;
614     iov = pcmk__new_ipc_event();
615     iov[0].iov_len = sizeof(pcmk__ipc_header_t);
616     iov[0].iov_base = header;
617 
618     header->version = PCMK__IPC_VERSION;
619     header->size_uncompressed = 1 + strlen(buffer);
620     total = iov[0].iov_len + header->size_uncompressed;
621 
622     if (total < max_send_size) {
623         iov[1].iov_base = buffer;
624         iov[1].iov_len = header->size_uncompressed;
625 
626     } else {
627         unsigned int new_size = 0;
628 
629         if (pcmk__compress(buffer, (unsigned int) header->size_uncompressed,
630                            (unsigned int) max_send_size, &compressed,
631                            &new_size) == pcmk_rc_ok) {
632 
633             pcmk__set_ipc_flags(header->flags, "send data", crm_ipc_compressed);
634             header->size_compressed = new_size;
635 
636             iov[1].iov_len = header->size_compressed;
637             iov[1].iov_base = compressed;
638 
639             free(buffer);
640 
641             biggest = QB_MAX(header->size_compressed, biggest);
642 
643         } else {
644             crm_log_xml_trace(message, "EMSGSIZE");
645             biggest = QB_MAX(header->size_uncompressed, biggest);
646 
647             crm_err("Could not compress %u-byte message into less than IPC "
648                     "limit of %u bytes; set PCMK_ipc_buffer to higher value "
649                     "(%u bytes suggested)",
650                     header->size_uncompressed, max_send_size, 4 * biggest);
651 
652             free(compressed);
653             free(buffer);
654             pcmk_free_ipc_event(iov);
655             return EMSGSIZE;
656         }
657     }
658 
659     header->qb.size = iov[0].iov_len + iov[1].iov_len;
660     header->qb.id = (int32_t)request;    /* Replying to a specific request */
661 
662     *result = iov;
663     CRM_ASSERT(header->qb.size > 0);
664     if (bytes != NULL) {
665         *bytes = header->qb.size;
666     }
667     return pcmk_rc_ok;
668 }
669 
670 int
pcmk__ipc_send_iov(pcmk__client_t * c,struct iovec * iov,uint32_t flags)671 pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
672 {
673     int rc = pcmk_rc_ok;
674     static uint32_t id = 1;
675     pcmk__ipc_header_t *header = iov[0].iov_base;
676 
677     if (c->flags & pcmk__client_proxied) {
678         /* _ALL_ replies to proxied connections need to be sent as events */
679         if (!pcmk_is_set(flags, crm_ipc_server_event)) {
680             /* The proxied flag lets us know this was originally meant to be a
681              * response, even though we're sending it over the event channel.
682              */
683             pcmk__set_ipc_flags(flags, "server event",
684                                 crm_ipc_server_event
685                                 |crm_ipc_proxied_relay_response);
686         }
687     }
688 
689     pcmk__set_ipc_flags(header->flags, "server event", flags);
690     if (flags & crm_ipc_server_event) {
691         header->qb.id = id++;   /* We don't really use it, but doesn't hurt to set one */
692 
693         if (flags & crm_ipc_server_free) {
694             crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid);
695             add_event(c, iov);
696 
697         } else {
698             struct iovec *iov_copy = pcmk__new_ipc_event();
699 
700             crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
701             iov_copy[0].iov_len = iov[0].iov_len;
702             iov_copy[0].iov_base = malloc(iov[0].iov_len);
703             memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
704 
705             iov_copy[1].iov_len = iov[1].iov_len;
706             iov_copy[1].iov_base = malloc(iov[1].iov_len);
707             memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
708 
709             add_event(c, iov_copy);
710         }
711 
712     } else {
713         ssize_t qb_rc;
714 
715         CRM_LOG_ASSERT(header->qb.id != 0);     /* Replying to a specific request */
716 
717         qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
718         if (qb_rc < header->qb.size) {
719             if (qb_rc < 0) {
720                 rc = (int) -qb_rc;
721             }
722             crm_notice("Response %d to pid %d failed: %s "
723                        CRM_XS " bytes=%u rc=%lld ipcs=%p",
724                        header->qb.id, c->pid, pcmk_rc_str(rc),
725                        header->qb.size, (long long) qb_rc, c->ipcs);
726 
727         } else {
728             crm_trace("Response %d sent, %lld bytes to %p[%d]",
729                       header->qb.id, (long long) qb_rc, c->ipcs, c->pid);
730         }
731 
732         if (flags & crm_ipc_server_free) {
733             pcmk_free_ipc_event(iov);
734         }
735     }
736 
737     if (flags & crm_ipc_server_event) {
738         rc = crm_ipcs_flush_events(c);
739     } else {
740         crm_ipcs_flush_events(c);
741     }
742 
743     if ((rc == EPIPE) || (rc == ENOTCONN)) {
744         crm_trace("Client %p disconnected", c->ipcs);
745     }
746     return rc;
747 }
748 
749 int
pcmk__ipc_send_xml(pcmk__client_t * c,uint32_t request,xmlNode * message,uint32_t flags)750 pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, xmlNode *message,
751                    uint32_t flags)
752 {
753     struct iovec *iov = NULL;
754     int rc = pcmk_rc_ok;
755 
756     if (c == NULL) {
757         return EINVAL;
758     }
759     rc = pcmk__ipc_prepare_iov(request, message, crm_ipc_default_buffer_size(),
760                                &iov, NULL);
761     if (rc == pcmk_rc_ok) {
762         pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
763         rc = pcmk__ipc_send_iov(c, iov, flags);
764     } else {
765         pcmk_free_ipc_event(iov);
766         crm_notice("IPC message to pid %d failed: %s " CRM_XS " rc=%d",
767                    c->pid, pcmk_rc_str(rc), rc);
768     }
769     return rc;
770 }
771 
772 /*!
773  * \internal
774  * \brief Send an acknowledgement with a status code to a client
775  *
776  * \param[in] function  Calling function
777  * \param[in] line      Source file line within calling function
778  * \param[in] c         Client to send ack to
779  * \param[in] request   Request ID being replied to
780  * \param[in] status    Exit status code to add to ack
781  * \param[in] flags     IPC flags to use when sending
782  * \param[in] tag       Element name to use for acknowledgement
783  * \param[in] status    Status code to send with acknowledgement
784  *
785  * \return Standard Pacemaker return code
786  */
787 int
pcmk__ipc_send_ack_as(const char * function,int line,pcmk__client_t * c,uint32_t request,uint32_t flags,const char * tag,crm_exit_t status)788 pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
789                       uint32_t request, uint32_t flags, const char *tag,
790                       crm_exit_t status)
791 {
792     int rc = pcmk_rc_ok;
793 
794     if (pcmk_is_set(flags, crm_ipc_client_response)) {
795         xmlNode *ack = create_xml_node(NULL, tag);
796 
797         crm_trace("Ack'ing IPC message from client %s as <%s status=%d>",
798                   pcmk__client_name(c), tag, status);
799         c->request_id = 0;
800         crm_xml_add(ack, "function", function);
801         crm_xml_add_int(ack, "line", line);
802         crm_xml_add_int(ack, "status", (int) status);
803         rc = pcmk__ipc_send_xml(c, request, ack, flags);
804         free_xml(ack);
805     }
806     return rc;
807 }
808 
809 /*!
810  * \internal
811  * \brief Add an IPC server to the main loop for the pacemaker-based API
812  *
813  * \param[out] ipcs_ro   New IPC server for read-only pacemaker-based API
814  * \param[out] ipcs_rw   New IPC server for read/write pacemaker-based API
815  * \param[out] ipcs_shm  New IPC server for shared-memory pacemaker-based API
816  * \param[in]  ro_cb     IPC callbacks for read-only API
817  * \param[in]  rw_cb     IPC callbacks for read/write and shared-memory APIs
818  *
819  * \note This function exits fatally if unable to create the servers.
820  */
pcmk__serve_based_ipc(qb_ipcs_service_t ** ipcs_ro,qb_ipcs_service_t ** ipcs_rw,qb_ipcs_service_t ** ipcs_shm,struct qb_ipcs_service_handlers * ro_cb,struct qb_ipcs_service_handlers * rw_cb)821 void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
822                            qb_ipcs_service_t **ipcs_rw,
823                            qb_ipcs_service_t **ipcs_shm,
824                            struct qb_ipcs_service_handlers *ro_cb,
825                            struct qb_ipcs_service_handlers *rw_cb)
826 {
827     *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
828                                        QB_IPC_NATIVE, ro_cb);
829 
830     *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
831                                        QB_IPC_NATIVE, rw_cb);
832 
833     *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
834                                         QB_IPC_SHM, rw_cb);
835 
836     if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
837         crm_err("Failed to create the CIB manager: exiting and inhibiting respawn");
838         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled");
839         crm_exit(CRM_EX_FATAL);
840     }
841 }
842 
843 /*!
844  * \internal
845  * \brief Destroy IPC servers for pacemaker-based API
846  *
847  * \param[out] ipcs_ro   IPC server for read-only pacemaker-based API
848  * \param[out] ipcs_rw   IPC server for read/write pacemaker-based API
849  * \param[out] ipcs_shm  IPC server for shared-memory pacemaker-based API
850  *
851  * \note This is a convenience function for calling qb_ipcs_destroy() for each
852  *       argument.
853  */
854 void
pcmk__stop_based_ipc(qb_ipcs_service_t * ipcs_ro,qb_ipcs_service_t * ipcs_rw,qb_ipcs_service_t * ipcs_shm)855 pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
856                      qb_ipcs_service_t *ipcs_rw,
857                      qb_ipcs_service_t *ipcs_shm)
858 {
859     qb_ipcs_destroy(ipcs_ro);
860     qb_ipcs_destroy(ipcs_rw);
861     qb_ipcs_destroy(ipcs_shm);
862 }
863 
864 /*!
865  * \internal
866  * \brief Add an IPC server to the main loop for the pacemaker-controld API
867  *
868  * \param[in] cb  IPC callbacks
869  *
870  * \return Newly created IPC server
871  */
872 qb_ipcs_service_t *
pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers * cb)873 pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
874 {
875     return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
876 }
877 
878 /*!
879  * \internal
880  * \brief Add an IPC server to the main loop for the pacemaker-attrd API
881  *
882  * \param[in] cb  IPC callbacks
883  *
884  * \note This function exits fatally if unable to create the servers.
885  */
886 void
pcmk__serve_attrd_ipc(qb_ipcs_service_t ** ipcs,struct qb_ipcs_service_handlers * cb)887 pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
888                       struct qb_ipcs_service_handlers *cb)
889 {
890     *ipcs = mainloop_add_ipc_server(T_ATTRD, QB_IPC_NATIVE, cb);
891 
892     if (*ipcs == NULL) {
893         crm_err("Failed to create pacemaker-attrd server: exiting and inhibiting respawn");
894         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
895         crm_exit(CRM_EX_FATAL);
896     }
897 }
898 
899 /*!
900  * \internal
901  * \brief Add an IPC server to the main loop for the pacemaker-fenced API
902  *
903  * \param[in] cb  IPC callbacks
904  *
905  * \note This function exits fatally if unable to create the servers.
906  */
907 void
pcmk__serve_fenced_ipc(qb_ipcs_service_t ** ipcs,struct qb_ipcs_service_handlers * cb)908 pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
909                        struct qb_ipcs_service_handlers *cb)
910 {
911     *ipcs = mainloop_add_ipc_server_with_prio("stonith-ng", QB_IPC_NATIVE, cb,
912                                               QB_LOOP_HIGH);
913 
914     if (*ipcs == NULL) {
915         crm_err("Failed to create fencer: exiting and inhibiting respawn.");
916         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
917         crm_exit(CRM_EX_FATAL);
918     }
919 }
920 
921 /*!
922  * \internal
923  * \brief Add an IPC server to the main loop for the pacemakerd API
924  *
925  * \param[in] cb  IPC callbacks
926  *
927  * \note This function exits with CRM_EX_OSERR if unable to create the servers.
928  */
929 void
pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t ** ipcs,struct qb_ipcs_service_handlers * cb)930 pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
931                        struct qb_ipcs_service_handlers *cb)
932 {
933     *ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, cb);
934 
935     if (*ipcs == NULL) {
936         crm_err("Couldn't start pacemakerd IPC server");
937         crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
938         /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
939          * if we want to prevent pacemakerd from restarting them.
940          * With pacemakerd we leave the exit-code shown to e.g. systemd
941          * to what it was prior to moving the code here from pacemakerd.c
942          */
943         crm_exit(CRM_EX_OSERR);
944     }
945 }
946 
947 /*!
948  * \brief Check whether string represents a client name used by cluster daemons
949  *
950  * \param[in] name  String to check
951  *
952  * \return true if name is standard client name used by daemons, false otherwise
953  *
954  * \note This is provided by the client, and so cannot be used by itself as a
955  *       secure means of authentication.
956  */
957 bool
crm_is_daemon_name(const char * name)958 crm_is_daemon_name(const char *name)
959 {
960     name = pcmk__message_name(name);
961     return (!strcmp(name, CRM_SYSTEM_CRMD)
962             || !strcmp(name, CRM_SYSTEM_STONITHD)
963             || !strcmp(name, "stonith-ng")
964             || !strcmp(name, "attrd")
965             || !strcmp(name, CRM_SYSTEM_CIB)
966             || !strcmp(name, CRM_SYSTEM_MCP)
967             || !strcmp(name, CRM_SYSTEM_DC)
968             || !strcmp(name, CRM_SYSTEM_TENGINE)
969             || !strcmp(name, CRM_SYSTEM_LRMD));
970 }
971