1 /*
2  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USAA
17  */
18 
19 #include <crm_internal.h>
20 #include <crm/cluster/internal.h>
21 #include <sys/types.h>
22 #include <sys/uio.h>
23 #include <sys/socket.h>
24 #include <sys/un.h>
25 #include <netinet/in.h>
26 #include <arpa/inet.h>
27 #include <unistd.h>
28 #include <fcntl.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <errno.h>
32 #include <signal.h>
33 #include <string.h>
34 
35 #include <corosync/totem/totempg.h>
36 #include <corosync/engine/objdb.h>
37 #include <corosync/engine/config.h>
38 
39 #include <config.h>
40 #include "plugin.h"
41 #include "utils.h"
42 
43 #include <glib.h>
44 
45 #include <sys/resource.h>
46 #include <sys/utsname.h>
47 #include <sys/socket.h>
48 #include <sys/wait.h>
49 #include <sys/stat.h>
50 #include <pthread.h>
51 #include <bzlib.h>
52 #include <pwd.h>
53 
54 struct corosync_api_v1 *pcmk_api = NULL;
55 
56 uint32_t plugin_has_votes = 0;
57 uint32_t plugin_expected_votes = 2;
58 
59 int use_mgmtd = 0;
60 int plugin_log_level = LOG_DEBUG;
61 char *local_uname = NULL;
62 int local_uname_len = 0;
63 char *local_cname = NULL;
64 int local_cname_len = 0;
65 uint32_t local_nodeid = 0;
66 char *ipc_channel_name = NULL;
67 static uint64_t local_born_on = 0;
68 
69 uint64_t membership_seq = 0;
70 pthread_t pcmk_wait_thread;
71 
72 gboolean use_mcp = FALSE;
73 gboolean wait_active = TRUE;
74 gboolean have_reliable_membership_id = FALSE;
75 GHashTable *ipc_client_list = NULL;
76 GHashTable *membership_list = NULL;
77 GHashTable *membership_notify_list = NULL;
78 
79 #define MAX_RESPAWN		100
80 #define LOOPBACK_ID		16777343
81 #define crm_flag_none		0x00000000
82 #define crm_flag_members	0x00000001
83 
84 struct crm_identify_msg_s {
85     cs_ipc_header_request_t header __attribute__ ((aligned(8)));
86     uint32_t id;
87     uint32_t pid;
88     int32_t votes;
89     uint32_t processes;
90     char uname[256];
91     char version[256];
92     uint64_t born_on;
93 } __attribute__ ((packed));
94 
95 /* *INDENT-OFF* */
96 static crm_child_t pcmk_children[] = {
97     { 0, crm_proc_none,     crm_flag_none,    0, 0, FALSE, "none",     NULL,		NULL,			   NULL, NULL },
98     { 0, crm_proc_plugin,      crm_flag_none,    0, 0, FALSE, "ais",      NULL,		NULL,			   NULL, NULL },
99     { 0, crm_proc_lrmd,     crm_flag_none,    3, 0, TRUE,  "lrmd",     NULL,		CRM_DAEMON_DIR"/lrmd",     NULL, NULL },
100     { 0, crm_proc_cib,      crm_flag_members, 1, 0, TRUE,  "cib",      CRM_DAEMON_USER, CRM_DAEMON_DIR"/cib",      NULL, NULL },
101     { 0, crm_proc_crmd,     crm_flag_members, 6, 0, TRUE,  "crmd",     CRM_DAEMON_USER, CRM_DAEMON_DIR"/crmd",     NULL, NULL },
102     { 0, crm_proc_attrd,    crm_flag_none,    4, 0, TRUE,  "attrd",    CRM_DAEMON_USER, CRM_DAEMON_DIR"/attrd",    NULL, NULL },
103     { 0, crm_proc_stonithd, crm_flag_none,    0, 0, TRUE,  "stonithd", NULL,		"/bin/false",		   NULL, NULL },
104     { 0, crm_proc_pe,       crm_flag_none,    5, 0, TRUE,  "pengine",  CRM_DAEMON_USER, CRM_DAEMON_DIR"/pengine",  NULL, NULL },
105     { 0, crm_proc_mgmtd,    crm_flag_none,    7, 0, TRUE,  "mgmtd",    NULL,		HB_DAEMON_DIR"/mgmtd",     NULL, NULL },
106     { 0, crm_proc_stonith_ng, crm_flag_members, 2, 0, TRUE,  "stonith-ng", NULL,		CRM_DAEMON_DIR"/stonithd", NULL, NULL },
107 };
108 /* *INDENT-ON* */
109 
110 void send_cluster_id(void);
111 int send_plugin_msg_raw(const AIS_Message * ais_msg);
112 char *pcmk_generate_membership_data(void);
113 gboolean check_message_sanity(const AIS_Message * msg, const char *data);
114 
115 typedef const void ais_void_ptr;
116 int pcmk_shutdown(void);
117 void pcmk_peer_update(enum totem_configuration_type configuration_type,
118                       const unsigned int *member_list, size_t member_list_entries,
119                       const unsigned int *left_list, size_t left_list_entries,
120                       const unsigned int *joined_list, size_t joined_list_entries,
121                       const struct memb_ring_id *ring_id);
122 
123 int pcmk_startup(struct corosync_api_v1 *corosync_api);
124 int pcmk_config_init(struct corosync_api_v1 *corosync_api);
125 
126 int pcmk_ipc_exit(void *conn);
127 int pcmk_ipc_connect(void *conn);
128 void pcmk_ipc(void *conn, ais_void_ptr * msg);
129 
130 void pcmk_exec_dump(void);
131 void pcmk_cluster_swab(void *msg);
132 void pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid);
133 
134 void pcmk_nodeid(void *conn, ais_void_ptr * msg);
135 void pcmk_nodes(void *conn, ais_void_ptr * msg);
136 void pcmk_notify(void *conn, ais_void_ptr * msg);
137 void pcmk_remove_member(void *conn, ais_void_ptr * msg);
138 void pcmk_quorum(void *conn, ais_void_ptr * msg);
139 
140 void pcmk_cluster_id_swab(void *msg);
141 void pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid);
142 void ais_remove_peer(char *node_id);
143 void ais_remove_peer_by_name(const char *node_name);
144 
145 static uint32_t
get_process_list(void)146 get_process_list(void)
147 {
148     int lpc = 0;
149     uint32_t procs = crm_proc_plugin;
150 
151     if (use_mcp) {
152         return 0;
153     }
154 
155     for (lpc = 0; lpc < SIZEOF(pcmk_children); lpc++) {
156         if (pcmk_children[lpc].pid != 0) {
157             procs |= pcmk_children[lpc].flag;
158         }
159     }
160     return procs;
161 }
162 
163 static struct corosync_lib_handler pcmk_lib_service[] = {
164     {                           /* 0 - crm_class_cluster */
165      .lib_handler_fn = pcmk_ipc,
166      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
167      },
168     {                           /* 1 - crm_class_members */
169      .lib_handler_fn = pcmk_nodes,
170      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
171      },
172     {                           /* 2 - crm_class_notify */
173      .lib_handler_fn = pcmk_notify,
174      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
175      },
176     {                           /* 3 - crm_class_nodeid */
177      .lib_handler_fn = pcmk_nodeid,
178      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
179      },
180     {                           /* 4 - crm_class_rmpeer */
181      .lib_handler_fn = pcmk_remove_member,
182      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
183      },
184     {                           /* 5 - crm_class_quorum */
185      .lib_handler_fn = pcmk_quorum,
186      .flow_control = COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
187      },
188 };
189 
190 static struct corosync_exec_handler pcmk_exec_service[] = {
191     {                           /* 0 */
192      .exec_handler_fn = pcmk_cluster_callback,
193      .exec_endian_convert_fn = pcmk_cluster_swab},
194     {                           /* 1 */
195      .exec_handler_fn = pcmk_cluster_id_callback,
196      .exec_endian_convert_fn = pcmk_cluster_id_swab}
197 };
198 
199 /*
200  * Exports the interface for the service
201  */
202 /* *INDENT-OFF* */
203 struct corosync_service_engine pcmk_service_handler = {
204     .name			= (char *)"Pacemaker Cluster Manager "PACEMAKER_VERSION,
205     .id				= PCMK_SERVICE_ID,
206     .private_data_size		= 0,
207     .flow_control		= COROSYNC_LIB_FLOW_CONTROL_NOT_REQUIRED,
208     .allow_inquorate		= CS_LIB_ALLOW_INQUORATE,
209     .lib_init_fn		= pcmk_ipc_connect,
210     .lib_exit_fn		= pcmk_ipc_exit,
211     .exec_init_fn		= pcmk_startup,
212     .exec_exit_fn		= pcmk_shutdown,
213     .config_init_fn		= pcmk_config_init,
214     .priority			= 50,
215     .lib_engine			= pcmk_lib_service,
216     .lib_engine_count		= sizeof (pcmk_lib_service) / sizeof (struct corosync_lib_handler),
217     .exec_engine		= pcmk_exec_service,
218     .exec_engine_count		= sizeof (pcmk_exec_service) / sizeof (struct corosync_exec_handler),
219     .confchg_fn			= pcmk_peer_update,
220     .exec_dump_fn		= pcmk_exec_dump,
221 /* 	void (*sync_init) (void); */
222 /* 	int (*sync_process) (void); */
223 /* 	void (*sync_activate) (void); */
224 /* 	void (*sync_abort) (void); */
225 };
226 
227 
228 /*
229  * Dynamic Loader definition
230  */
231 struct corosync_service_engine *pcmk_get_handler_ver0 (void);
232 
233 struct corosync_service_engine_iface_ver0 pcmk_service_handler_iface = {
234     .corosync_get_service_engine_ver0 = pcmk_get_handler_ver0
235 };
236 
237 static struct lcr_iface openais_pcmk_ver0[2] = {
238     {
239 	.name				= "pacemaker",
240 	.version			= 0,
241 	.versions_replace		= 0,
242 	.versions_replace_count		= 0,
243 	.dependencies			= 0,
244 	.dependency_count		= 0,
245 	.constructor			= NULL,
246 	.destructor			= NULL,
247 	.interfaces			= NULL
248     },
249     {
250 	.name				= "pacemaker",
251 	.version			= 1,
252 	.versions_replace		= 0,
253 	.versions_replace_count		= 0,
254 	.dependencies			= 0,
255 	.dependency_count		= 0,
256 	.constructor			= NULL,
257 	.destructor			= NULL,
258 	.interfaces			= NULL
259     }
260 };
261 
262 static struct lcr_comp pcmk_comp_ver0 = {
263     .iface_count			= 2,
264     .ifaces				= openais_pcmk_ver0
265 };
266 /* *INDENT-ON* */
267 
268 struct corosync_service_engine *
pcmk_get_handler_ver0(void)269 pcmk_get_handler_ver0(void)
270 {
271     return (&pcmk_service_handler);
272 }
273 
274 __attribute__ ((constructor))
275 static void
register_this_component(void)276 register_this_component(void)
277 {
278     lcr_interfaces_set(&openais_pcmk_ver0[0], &pcmk_service_handler_iface);
279     lcr_interfaces_set(&openais_pcmk_ver0[1], &pcmk_service_handler_iface);
280 
281     lcr_component_register(&pcmk_comp_ver0);
282 }
283 
284 static int
plugin_has_quorum(void)285 plugin_has_quorum(void)
286 {
287     if ((plugin_expected_votes >> 1) < plugin_has_votes) {
288         return 1;
289     }
290     return 0;
291 }
292 
293 static void
update_expected_votes(int value)294 update_expected_votes(int value)
295 {
296     if (value < plugin_has_votes) {
297         /* Never drop below the number of connected nodes */
298         ais_info("Cannot update expected quorum votes %d -> %d:"
299                  " value cannot be less that the current number of votes",
300                  plugin_expected_votes, value);
301 
302     } else if (plugin_expected_votes != value) {
303         ais_info("Expected quorum votes %d -> %d", plugin_expected_votes, value);
304         plugin_expected_votes = value;
305     }
306 }
307 
308 /* Create our own local copy of the config so we can navigate it */
309 static void
process_ais_conf(void)310 process_ais_conf(void)
311 {
312     char *value = NULL;
313     gboolean any_log = FALSE;
314     hdb_handle_t top_handle = 0;
315     hdb_handle_t local_handle = 0;
316 
317     ais_info("Reading configure");
318     top_handle = config_find_init(pcmk_api, "logging");
319     local_handle = config_find_next(pcmk_api, "logging", top_handle);
320 
321     get_config_opt(pcmk_api, local_handle, "debug", &value, "on");
322     if (ais_get_boolean(value)) {
323         plugin_log_level = LOG_DEBUG;
324         pcmk_env.debug = "1";
325 
326     } else {
327         plugin_log_level = LOG_INFO;
328         pcmk_env.debug = "0";
329     }
330 
331     get_config_opt(pcmk_api, local_handle, "to_logfile", &value, "off");
332     if (ais_get_boolean(value)) {
333         get_config_opt(pcmk_api, local_handle, "logfile", &value, NULL);
334 
335         if (value == NULL) {
336             ais_err("Logging to a file requested but no log file specified");
337 
338         } else {
339             uid_t pcmk_uid = geteuid();
340             uid_t pcmk_gid = getegid();
341 
342             FILE *logfile = fopen(value, "a");
343 
344             if (logfile) {
345                 int ignore = 0;
346                 int logfd = fileno(logfile);
347 
348                 pcmk_env.logfile = value;
349 
350                 /* Ensure the file has the correct permissions */
351                 ignore = fchown(logfd, pcmk_uid, pcmk_gid);
352                 ignore = fchmod(logfd, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
353 
354                 if (ignore < 0) {
355                     fprintf(logfile, "Could not set r/w permissions for uid=%d, gid=%d on %s\n",
356                             pcmk_uid, pcmk_gid, value);
357 
358                 } else {
359                     fprintf(logfile, "Set r/w permissions for uid=%d, gid=%d on %s\n",
360                             pcmk_uid, pcmk_gid, value);
361                 }
362                 fflush(logfile);
363                 fsync(logfd);
364                 fclose(logfile);
365                 any_log = TRUE;
366 
367             } else {
368                 ais_err("Couldn't create logfile: %s", value);
369             }
370         }
371     }
372 
373     get_config_opt(pcmk_api, local_handle, "to_syslog", &value, "on");
374     if (any_log && ais_get_boolean(value) == FALSE) {
375         ais_info("User configured file based logging and explicitly disabled syslog.");
376         value = "none";
377 
378     } else {
379         if (ais_get_boolean(value) == FALSE) {
380             ais_err
381                 ("Please enable some sort of logging, either 'to_file: on' or  'to_syslog: on'.");
382             ais_err("If you use file logging, be sure to also define a value for 'logfile'");
383         }
384         get_config_opt(pcmk_api, local_handle, "syslog_facility", &value, "daemon");
385     }
386     pcmk_env.syslog = value;
387 
388     config_find_done(pcmk_api, local_handle);
389 
390     top_handle = config_find_init(pcmk_api, "quorum");
391     local_handle = config_find_next(pcmk_api, "quorum", top_handle);
392     get_config_opt(pcmk_api, local_handle, "provider", &value, NULL);
393     if (value && ais_str_eq("quorum_cman", value)) {
394         pcmk_env.quorum = "cman";
395     } else {
396         pcmk_env.quorum = "pcmk";
397     }
398 
399     top_handle = config_find_init(pcmk_api, "service");
400     local_handle = config_find_next(pcmk_api, "service", top_handle);
401     while (local_handle) {
402         value = NULL;
403         pcmk_api->object_key_get(local_handle, "name", strlen("name"), (void **)&value, NULL);
404         if (ais_str_eq("pacemaker", value)) {
405             break;
406         }
407         local_handle = config_find_next(pcmk_api, "service", top_handle);
408     }
409 
410     get_config_opt(pcmk_api, local_handle, "ver", &value, "0");
411     if (ais_str_eq(value, "1")) {
412         ais_info("Enabling MCP mode: Use the Pacemaker init script to complete Pacemaker startup");
413         use_mcp = TRUE;
414     }
415 
416     get_config_opt(pcmk_api, local_handle, "clustername", &local_cname, "pcmk");
417     local_cname_len = strlen(local_cname);
418 
419     get_config_opt(pcmk_api, local_handle, "use_logd", &value, "no");
420     pcmk_env.use_logd = value;
421 
422     get_config_opt(pcmk_api, local_handle, "use_mgmtd", &value, "no");
423     if (ais_get_boolean(value) == FALSE) {
424         int lpc = 0;
425 
426         for (; lpc < SIZEOF(pcmk_children); lpc++) {
427             if (crm_proc_mgmtd & pcmk_children[lpc].flag) {
428                 /* Disable mgmtd startup */
429                 pcmk_children[lpc].start_seq = 0;
430                 break;
431             }
432         }
433     }
434 
435     config_find_done(pcmk_api, local_handle);
436 }
437 
438 int
pcmk_config_init(struct corosync_api_v1 * unused)439 pcmk_config_init(struct corosync_api_v1 *unused)
440 {
441     return 0;
442 }
443 
444 static void *
pcmk_wait_dispatch(void * arg)445 pcmk_wait_dispatch(void *arg)
446 {
447     struct timespec waitsleep = {
448         .tv_sec = 1,
449         .tv_nsec = 0
450     };
451 
452     while (wait_active) {
453         int lpc = 0;
454 
455         for (; lpc < SIZEOF(pcmk_children); lpc++) {
456             if (pcmk_children[lpc].pid > 0) {
457                 int status;
458                 pid_t pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL);
459 
460                 if (pid == 0) {
461                     continue;
462 
463                 } else if (pid < 0) {
464                     ais_perror("Call to wait4(%s) failed", pcmk_children[lpc].name);
465                     continue;
466                 }
467 
468                 /* cleanup */
469                 pcmk_children[lpc].pid = 0;
470                 pcmk_children[lpc].conn = NULL;
471                 pcmk_children[lpc].async_conn = NULL;
472 
473                 if (WIFSIGNALED(status)) {
474                     int sig = WTERMSIG(status);
475 
476                     ais_err("Child process %s terminated with signal %d"
477                             " (pid=%d, core=%s)",
478                             pcmk_children[lpc].name, sig, pid,
479                             WCOREDUMP(status) ? "true" : "false");
480 
481                 } else if (WIFEXITED(status)) {
482                     int rc = WEXITSTATUS(status);
483 
484                     do_ais_log(rc == 0 ? LOG_NOTICE : LOG_ERR,
485                                "Child process %s exited (pid=%d, rc=%d)", pcmk_children[lpc].name,
486                                pid, rc);
487 
488                     if (rc == 100) {
489                         ais_notice("Child process %s no longer wishes"
490                                    " to be respawned", pcmk_children[lpc].name);
491                         pcmk_children[lpc].respawn = FALSE;
492                     }
493                 }
494 
495                 /* Broadcast the fact that one of our processes died
496                  *
497                  * Try to get some logging of the cause out first though
498                  * because we're probably about to get fenced
499                  *
500                  * Potentially do this only if respawn_count > N
501                  * to allow for local recovery
502                  */
503                 send_cluster_id();
504 
505                 pcmk_children[lpc].respawn_count += 1;
506                 if (pcmk_children[lpc].respawn_count > MAX_RESPAWN) {
507                     ais_err("Child respawn count exceeded by %s", pcmk_children[lpc].name);
508                     pcmk_children[lpc].respawn = FALSE;
509                 }
510                 if (pcmk_children[lpc].respawn) {
511                     ais_notice("Respawning failed child process: %s", pcmk_children[lpc].name);
512                     spawn_child(&(pcmk_children[lpc]));
513                 }
514                 send_cluster_id();
515             }
516         }
517         sched_yield();
518         nanosleep(&waitsleep, 0);
519     }
520     return 0;
521 }
522 
523 static uint32_t
pcmk_update_nodeid(void)524 pcmk_update_nodeid(void)
525 {
526     int last = local_nodeid;
527 
528     local_nodeid = pcmk_api->totem_nodeid_get();
529 
530     if (last != local_nodeid) {
531         if (last == 0) {
532             ais_info("Local node id: %u", local_nodeid);
533 
534         } else {
535             char *last_s = NULL;
536 
537             ais_malloc0(last_s, 32);
538             ais_warn("Detected local node id change: %u -> %u", last, local_nodeid);
539             snprintf(last_s, 31, "%u", last);
540             ais_remove_peer(last_s);
541             ais_free(last_s);
542         }
543         update_member(local_nodeid, 0, 0, 1, 0, local_uname, CRM_NODE_MEMBER, NULL);
544     }
545 
546     return local_nodeid;
547 }
548 
549 static void
build_path(const char * path_c,mode_t mode)550 build_path(const char *path_c, mode_t mode)
551 {
552     int offset = 1, len = 0;
553     char *path = ais_strdup(path_c);
554 
555     AIS_CHECK(path != NULL, return);
556     for (len = strlen(path); offset < len; offset++) {
557         if (path[offset] == '/') {
558             path[offset] = 0;
559             if (mkdir(path, mode) < 0 && errno != EEXIST) {
560                 ais_perror("Could not create directory '%s'", path);
561                 break;
562             }
563             path[offset] = '/';
564         }
565     }
566     if (mkdir(path, mode) < 0 && errno != EEXIST) {
567         ais_perror("Could not create directory '%s'", path);
568     }
569     ais_free(path);
570 }
571 
572 int
pcmk_startup(struct corosync_api_v1 * init_with)573 pcmk_startup(struct corosync_api_v1 *init_with)
574 {
575     int rc = 0;
576     int lpc = 0;
577     int start_seq = 1;
578     struct utsname us;
579     struct rlimit cores;
580     static int max = SIZEOF(pcmk_children);
581 
582     uid_t pcmk_uid = 0;
583     gid_t pcmk_gid = 0;
584 
585     uid_t root_uid = -1;
586     uid_t cs_uid = geteuid();
587 
588     pcmk_user_lookup("root", &root_uid, NULL);
589 
590     pcmk_api = init_with;
591 
592     pcmk_env.debug = "0";
593     pcmk_env.logfile = NULL;
594     pcmk_env.use_logd = "false";
595     pcmk_env.syslog = "daemon";
596 
597     if (cs_uid != root_uid) {
598         ais_err("Corosync must be configured to start as 'root',"
599                 " otherwise Pacemaker cannot manage services."
600                 "  Expected %d got %d", root_uid, cs_uid);
601         return -1;
602     }
603 
604     process_ais_conf();
605 
606     membership_list = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, destroy_ais_node);
607     membership_notify_list = g_hash_table_new(g_direct_hash, g_direct_equal);
608     ipc_client_list = g_hash_table_new(g_direct_hash, g_direct_equal);
609 
610     ais_info("CRM: Initialized");
611     log_printf(LOG_INFO, "Logging: Initialized %s\n", __FUNCTION__);
612 
613     rc = getrlimit(RLIMIT_CORE, &cores);
614     if (rc < 0) {
615         ais_perror("Cannot determine current maximum core size.");
616     } else {
617         if (cores.rlim_max == 0 && geteuid() == 0) {
618             cores.rlim_max = RLIM_INFINITY;
619         } else {
620             ais_info("Maximum core file size is: %lu", cores.rlim_max);
621         }
622         cores.rlim_cur = cores.rlim_max;
623 
624         rc = setrlimit(RLIMIT_CORE, &cores);
625         if (rc < 0) {
626             ais_perror("Core file generation will remain disabled."
627                        " Core files are an important diagnositic tool,"
628                        " please consider enabling them by default.");
629         }
630 #if 0
631         /* system() is not thread-safe, can't call from here
632          * Actually, it's a pretty hacky way to try and achieve this anyway
633          */
634         if (system("echo 1 > /proc/sys/kernel/core_uses_pid") != 0) {
635             ais_perror("Could not enable /proc/sys/kernel/core_uses_pid");
636         }
637 #endif
638     }
639 
640     if (pcmk_user_lookup(CRM_DAEMON_USER, &pcmk_uid, &pcmk_gid) < 0) {
641         ais_err("Cluster user %s does not exist, aborting Pacemaker startup", CRM_DAEMON_USER);
642         return TRUE;
643     }
644 
645     rc = mkdir(CRM_STATE_DIR, 0750);
646     rc = chown(CRM_STATE_DIR, pcmk_uid, pcmk_gid);
647 
648     /* Used by stonithd */
649     build_path(HA_STATE_DIR "/heartbeat", 0755);
650 
651     /* Used by RAs - Leave owned by root */
652     build_path(CRM_RSCTMP_DIR, 0755);
653 
654     rc = uname(&us);
655     AIS_ASSERT(rc == 0);
656     local_uname = ais_strdup(us.nodename);
657     local_uname_len = strlen(local_uname);
658 
659     ais_info("Service: %d", PCMK_SERVICE_ID);
660     ais_info("Local hostname: %s", local_uname);
661     pcmk_update_nodeid();
662 
663     if (use_mcp == FALSE) {
664         pthread_create(&pcmk_wait_thread, NULL, pcmk_wait_dispatch, NULL);
665         for (start_seq = 1; start_seq < max; start_seq++) {
666             /* don't start anything with start_seq < 1 */
667             for (lpc = 0; lpc < max; lpc++) {
668                 if (start_seq == pcmk_children[lpc].start_seq) {
669                     spawn_child(&(pcmk_children[lpc]));
670                 }
671             }
672         }
673     }
674     return 0;
675 }
676 
677 #if 0
678 /* copied here for reference from exec/totempg.c */
679 char *
680 totempg_ifaces_print(unsigned int nodeid)
681 {
682     static char iface_string[256 * INTERFACE_MAX];
683     char one_iface[64];
684     struct totem_ip_address interfaces[INTERFACE_MAX];
685     char **status;
686     unsigned int iface_count;
687     unsigned int i;
688     int res;
689 
690     iface_string[0] = '\0';
691 
692     res = totempg_ifaces_get(nodeid, interfaces, &status, &iface_count);
693     if (res == -1) {
694         return ("no interface found for nodeid");
695     }
696 
697     for (i = 0; i < iface_count; i++) {
698         sprintf(one_iface, "r(%d) ip(%s), ", i, totemip_print(&interfaces[i]));
699         strcat(iface_string, one_iface);
700     }
701     return (iface_string);
702 }
703 #endif
704 
705 static void
ais_mark_unseen_peer_dead(gpointer key,gpointer value,gpointer user_data)706 ais_mark_unseen_peer_dead(gpointer key, gpointer value, gpointer user_data)
707 {
708     int *changed = user_data;
709     crm_node_t *node = value;
710 
711     if (node->last_seen != membership_seq && ais_str_eq(CRM_NODE_LOST, node->state) == FALSE) {
712         ais_info("Node %s was not seen in the previous transition", node->uname);
713         *changed += update_member(node->id, 0, membership_seq, node->votes,
714                                   node->processes, node->uname, CRM_NODE_LOST, NULL);
715     }
716 }
717 
718 void
pcmk_peer_update(enum totem_configuration_type configuration_type,const unsigned int * member_list,size_t member_list_entries,const unsigned int * left_list,size_t left_list_entries,const unsigned int * joined_list,size_t joined_list_entries,const struct memb_ring_id * ring_id)719 pcmk_peer_update(enum totem_configuration_type configuration_type,
720                  const unsigned int *member_list, size_t member_list_entries,
721                  const unsigned int *left_list, size_t left_list_entries,
722                  const unsigned int *joined_list, size_t joined_list_entries,
723                  const struct memb_ring_id *ring_id)
724 {
725     int lpc = 0;
726     int changed = 0;
727     int do_update = 0;
728 
729     AIS_ASSERT(ring_id != NULL);
730     switch (configuration_type) {
731         case TOTEM_CONFIGURATION_REGULAR:
732             do_update = 1;
733             break;
734         case TOTEM_CONFIGURATION_TRANSITIONAL:
735             break;
736     }
737 
738     membership_seq = ring_id->seq;
739     ais_notice("%s membership event on ring %lld: memb=%ld, new=%ld, lost=%ld",
740                do_update ? "Stable" : "Transitional", ring_id->seq,
741                (long)member_list_entries, (long)joined_list_entries, (long)left_list_entries);
742 
743     if (do_update == 0) {
744         for (lpc = 0; lpc < joined_list_entries; lpc++) {
745             const char *prefix = "new: ";
746             uint32_t nodeid = joined_list[lpc];
747 
748             ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
749         }
750         for (lpc = 0; lpc < member_list_entries; lpc++) {
751             const char *prefix = "memb:";
752             uint32_t nodeid = member_list[lpc];
753 
754             ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
755         }
756         for (lpc = 0; lpc < left_list_entries; lpc++) {
757             const char *prefix = "lost:";
758             uint32_t nodeid = left_list[lpc];
759 
760             ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
761         }
762         return;
763     }
764 
765     for (lpc = 0; lpc < joined_list_entries; lpc++) {
766         const char *prefix = "NEW: ";
767         uint32_t nodeid = joined_list[lpc];
768         crm_node_t *node = NULL;
769 
770         changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
771 
772         ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
773 
774         node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(nodeid));
775         if (node->addr == NULL) {
776             const char *addr = totempg_ifaces_print(nodeid);
777 
778             node->addr = ais_strdup(addr);
779             ais_debug("Node %u has address %s", nodeid, node->addr);
780         }
781     }
782 
783     for (lpc = 0; lpc < member_list_entries; lpc++) {
784         const char *prefix = "MEMB:";
785         uint32_t nodeid = member_list[lpc];
786 
787         changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_MEMBER, NULL);
788 
789         ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
790     }
791 
792     for (lpc = 0; lpc < left_list_entries; lpc++) {
793         const char *prefix = "LOST:";
794         uint32_t nodeid = left_list[lpc];
795 
796         changed += update_member(nodeid, 0, membership_seq, -1, 0, NULL, CRM_NODE_LOST, NULL);
797         ais_info("%s %s %u", prefix, member_uname(nodeid), nodeid);
798     }
799 
800     if (changed && joined_list_entries == 0 && left_list_entries == 0) {
801         ais_err("Something strange happened: %d", changed);
802         changed = 0;
803     }
804 
805     ais_trace("Reaping unseen nodes...");
806     g_hash_table_foreach(membership_list, ais_mark_unseen_peer_dead, &changed);
807 
808     if (member_list_entries > 1) {
809         /* Used to set born-on in send_cluster_id())
810          * We need to wait until we have at least one peer since first
811          * membership id is based on the one before we stopped and isn't reliable
812          */
813         have_reliable_membership_id = TRUE;
814     }
815 
816     if (changed) {
817         ais_debug("%d nodes changed", changed);
818         pcmk_update_nodeid();
819         send_member_notification();
820     }
821 
822     send_cluster_id();
823 }
824 
825 int
pcmk_ipc_exit(void * conn)826 pcmk_ipc_exit(void *conn)
827 {
828     int lpc = 0;
829     const char *client = NULL;
830     void *async_conn = conn;
831 
832     for (; lpc < SIZEOF(pcmk_children); lpc++) {
833         if (pcmk_children[lpc].conn == conn) {
834             if (wait_active == FALSE) {
835                 /* Make sure the shutdown loop exits */
836                 pcmk_children[lpc].pid = 0;
837             }
838             pcmk_children[lpc].conn = NULL;
839             pcmk_children[lpc].async_conn = NULL;
840             client = pcmk_children[lpc].name;
841             break;
842         }
843     }
844 
845     g_hash_table_remove(membership_notify_list, async_conn);
846     g_hash_table_remove(ipc_client_list, async_conn);
847 
848     if (client) {
849         do_ais_log(LOG_INFO, "Client %s (conn=%p, async-conn=%p) left", client, conn, async_conn);
850     } else {
851         do_ais_log((LOG_DEBUG + 1), "Client %s (conn=%p, async-conn=%p) left",
852                    "unknown-transient", conn, async_conn);
853     }
854 
855     return (0);
856 }
857 
858 int
pcmk_ipc_connect(void * conn)859 pcmk_ipc_connect(void *conn)
860 {
861     /* Corosync hasn't finished setting up the connection at this point
862      * Sending messages now messes up the protocol!
863      */
864     return (0);
865 }
866 
867 /*
868  * Executive message handlers
869  */
870 void
pcmk_cluster_swab(void * msg)871 pcmk_cluster_swab(void *msg)
872 {
873     AIS_Message *ais_msg = msg;
874 
875     ais_trace("Performing endian conversion...");
876     ais_msg->id = swab32(ais_msg->id);
877     ais_msg->size = swab32(ais_msg->size);
878     ais_msg->is_compressed = swab32(ais_msg->is_compressed);
879     ais_msg->compressed_size = swab32(ais_msg->compressed_size);
880 
881     ais_msg->host.id = swab32(ais_msg->host.id);
882     ais_msg->host.pid = swab32(ais_msg->host.pid);
883     ais_msg->host.type = swab32(ais_msg->host.type);
884     ais_msg->host.size = swab32(ais_msg->host.size);
885     ais_msg->host.local = swab32(ais_msg->host.local);
886 
887     ais_msg->sender.id = swab32(ais_msg->sender.id);
888     ais_msg->sender.pid = swab32(ais_msg->sender.pid);
889     ais_msg->sender.type = swab32(ais_msg->sender.type);
890     ais_msg->sender.size = swab32(ais_msg->sender.size);
891     ais_msg->sender.local = swab32(ais_msg->sender.local);
892 
893     ais_msg->header.size = swab32(ais_msg->header.size);
894     ais_msg->header.id = swab32(ais_msg->header.id);
895     ais_msg->header.error = swab32(ais_msg->header.error);
896 }
897 
898 void
pcmk_cluster_callback(ais_void_ptr * message,unsigned int nodeid)899 pcmk_cluster_callback(ais_void_ptr * message, unsigned int nodeid)
900 {
901     const AIS_Message *ais_msg = message;
902 
903     ais_trace("Message from node %u (%s)", nodeid, nodeid == local_nodeid ? "local" : "remote");
904 /*  Shouldn't be required...
905     update_member(
906  	ais_msg->sender.id, membership_seq, -1, 0, ais_msg->sender.uname, NULL);
907 */
908 
909     if (ais_msg->host.size == 0 || ais_str_eq(ais_msg->host.uname, local_uname)) {
910         route_ais_message(ais_msg, FALSE);
911 
912     } else {
913         ais_trace("Discarding Msg[%d] (dest=%s:%s, from=%s:%s)",
914                   ais_msg->id, ais_dest(&(ais_msg->host)),
915                   msg_type2text(ais_msg->host.type),
916                   ais_dest(&(ais_msg->sender)), msg_type2text(ais_msg->sender.type));
917     }
918 }
919 
920 void
pcmk_cluster_id_swab(void * msg)921 pcmk_cluster_id_swab(void *msg)
922 {
923     struct crm_identify_msg_s *ais_msg = msg;
924 
925     ais_trace("Performing endian conversion...");
926     ais_msg->id = swab32(ais_msg->id);
927     ais_msg->pid = swab32(ais_msg->pid);
928     ais_msg->votes = swab32(ais_msg->votes);
929     ais_msg->processes = swab32(ais_msg->processes);
930     ais_msg->born_on = swab64(ais_msg->born_on);
931 
932     ais_msg->header.size = swab32(ais_msg->header.size);
933     ais_msg->header.id = swab32(ais_msg->header.id);
934 }
935 
936 void
pcmk_cluster_id_callback(ais_void_ptr * message,unsigned int nodeid)937 pcmk_cluster_id_callback(ais_void_ptr * message, unsigned int nodeid)
938 {
939     int changed = 0;
940     const struct crm_identify_msg_s *msg = message;
941 
942     if (nodeid != msg->id) {
943         ais_err("Invalid message: Node %u claimed to be node %d", nodeid, msg->id);
944         return;
945     }
946     ais_debug("Node update: %s (%s)", msg->uname, msg->version);
947     changed =
948         update_member(nodeid, msg->born_on, membership_seq, msg->votes, msg->processes, msg->uname,
949                       NULL, msg->version);
950 
951     if (changed) {
952         send_member_notification();
953     }
954 }
955 
956 struct res_overlay {
957     cs_ipc_header_response_t header __attribute((aligned(8)));
958     char buf[4096];
959 };
960 
961 struct res_overlay *res_overlay = NULL;
962 
963 static void
send_ipc_ack(void * conn)964 send_ipc_ack(void *conn)
965 {
966     if (res_overlay == NULL) {
967         ais_malloc0(res_overlay, sizeof(struct res_overlay));
968     }
969 
970     res_overlay->header.id = CRM_MESSAGE_IPC_ACK;
971     res_overlay->header.size = sizeof(cs_ipc_header_response_t);
972     res_overlay->header.error = CS_OK;
973     pcmk_api->ipc_response_send(conn, res_overlay, res_overlay->header.size);
974 }
975 
976 /* local callbacks */
977 void
pcmk_ipc(void * conn,ais_void_ptr * msg)978 pcmk_ipc(void *conn, ais_void_ptr * msg)
979 {
980     AIS_Message *mutable;
981     int type = 0;
982     gboolean transient = TRUE;
983     const AIS_Message *ais_msg = (const AIS_Message *)msg;
984     void *async_conn = conn;
985 
986     ais_trace("Message from client %p", conn);
987 
988     if (check_message_sanity(msg, ((const AIS_Message *)msg)->data) == FALSE) {
989         /* The message is corrupted - ignore */
990         send_ipc_ack(conn);
991         msg = NULL;
992         return;
993     }
994 
995     /* Make a copy of the message here and ACK it
996      * The message is only valid until a response is sent
997      * but the response must also be sent _before_ we send anything else
998      */
999 
1000     mutable = ais_msg_copy(ais_msg);
1001     AIS_ASSERT(check_message_sanity(mutable, mutable->data));
1002 
1003     type = mutable->sender.type;
1004     ais_trace
1005         ("type: %d local: %d conn: %p host type: %d ais: %d sender pid: %d child pid: %d size: %d",
1006          type, mutable->host.local, pcmk_children[type].conn, mutable->host.type, crm_msg_ais,
1007          mutable->sender.pid, pcmk_children[type].pid, ((int)SIZEOF(pcmk_children)));
1008 
1009     if (type > crm_msg_none && type < SIZEOF(pcmk_children)) {
1010         /* known child process */
1011         transient = FALSE;
1012     }
1013 #if 0
1014     /* If this check fails, the order of pcmk_children probably
1015      *   doesn't match that of the crm_ais_msg_types enum
1016      */
1017     AIS_CHECK(transient || mutable->sender.pid == pcmk_children[type].pid,
1018               ais_err("Sender: %d, child[%d]: %d", mutable->sender.pid, type,
1019                       pcmk_children[type].pid);
1020               ais_free(mutable);
1021               return);
1022 #endif
1023 
1024     if (transient == FALSE
1025         && type > crm_msg_none
1026         && mutable->host.local
1027         && pcmk_children[type].conn == NULL && mutable->host.type == crm_msg_ais) {
1028         AIS_CHECK(mutable->sender.type != mutable->sender.pid,
1029                   ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type));
1030 
1031         ais_info("Recorded connection %p for %s/%d",
1032                  conn, pcmk_children[type].name, pcmk_children[type].pid);
1033         pcmk_children[type].conn = conn;
1034         pcmk_children[type].async_conn = async_conn;
1035 
1036         /* Make sure they have the latest membership */
1037         if (pcmk_children[type].flags & crm_flag_members) {
1038             char *update = pcmk_generate_membership_data();
1039 
1040             g_hash_table_replace(membership_notify_list, async_conn, async_conn);
1041             ais_info("Sending membership update " U64T " to %s",
1042                      membership_seq, pcmk_children[type].name);
1043             send_client_msg(async_conn, crm_class_members, crm_msg_none, update);
1044         }
1045 
1046     } else if (transient) {
1047         AIS_CHECK(mutable->sender.type == mutable->sender.pid,
1048                   ais_err("Pid=%d, type=%d", mutable->sender.pid, mutable->sender.type));
1049         g_hash_table_replace(ipc_client_list, async_conn, GUINT_TO_POINTER(mutable->sender.pid));
1050     }
1051 
1052     mutable->sender.id = local_nodeid;
1053     mutable->sender.size = local_uname_len;
1054     memset(mutable->sender.uname, 0, MAX_NAME);
1055     memcpy(mutable->sender.uname, local_uname, mutable->sender.size);
1056 
1057     route_ais_message(mutable, TRUE);
1058     send_ipc_ack(conn);
1059     msg = NULL;
1060     ais_free(mutable);
1061 }
1062 
1063 int
pcmk_shutdown(void)1064 pcmk_shutdown(void)
1065 {
1066     int lpc = 0;
1067     static int phase = 0;
1068     static int max_wait = 0;
1069     static time_t next_log = 0;
1070     static int max = SIZEOF(pcmk_children);
1071 
1072     if (use_mcp) {
1073         if (pcmk_children[crm_msg_crmd].conn || pcmk_children[crm_msg_stonith_ng].conn) {
1074             time_t now = time(NULL);
1075 
1076             if (now > next_log) {
1077                 next_log = now + 300;
1078                 ais_notice
1079                     ("Preventing Corosync shutdown.  Please ensure Pacemaker is stopped first.");
1080             }
1081             return -1;
1082         }
1083         ais_notice("Unloading Pacemaker plugin");
1084         return 0;
1085     }
1086 
1087     if (phase == 0) {
1088         ais_notice("Shutting down Pacemaker");
1089         phase = max;
1090     }
1091 
1092     wait_active = FALSE;        /* stop the wait loop */
1093 
1094     for (; phase > 0; phase--) {
1095         /* don't stop anything with start_seq < 1 */
1096 
1097         for (lpc = max - 1; lpc >= 0; lpc--) {
1098             if (phase != pcmk_children[lpc].start_seq) {
1099                 continue;
1100             }
1101 
1102             if (pcmk_children[lpc].pid) {
1103                 pid_t pid = 0;
1104                 int status = 0;
1105                 time_t now = time(NULL);
1106 
1107                 if (pcmk_children[lpc].respawn) {
1108                     max_wait = 5;       /* 5 * 30s = 2.5 minutes... plenty once the crmd is gone */
1109                     next_log = now + 30;
1110                     pcmk_children[lpc].respawn = FALSE;
1111                     stop_child(&(pcmk_children[lpc]), SIGTERM);
1112                 }
1113 
1114                 pid = wait4(pcmk_children[lpc].pid, &status, WNOHANG, NULL);
1115                 if (pid < 0) {
1116                     ais_perror("Call to wait4(%s/%d) failed - treating it as stopped",
1117                                pcmk_children[lpc].name, pcmk_children[lpc].pid);
1118 
1119                 } else if (pid == 0) {
1120                     if (now >= next_log) {
1121                         max_wait--;
1122                         next_log = now + 30;
1123                         ais_notice("Still waiting for %s (pid=%d, seq=%d) to terminate...",
1124                                    pcmk_children[lpc].name, pcmk_children[lpc].pid,
1125                                    pcmk_children[lpc].start_seq);
1126                         if (max_wait <= 0 && phase < pcmk_children[crm_msg_crmd].start_seq) {
1127                             ais_err("Child %s taking too long to terminate, sending SIGKILL",
1128                                     pcmk_children[lpc].name);
1129                             stop_child(&(pcmk_children[lpc]), SIGKILL);
1130                         }
1131                     }
1132                     /* Return control to corosync */
1133                     return -1;
1134                 }
1135             }
1136 
1137             /* cleanup */
1138             ais_notice("%s confirmed stopped", pcmk_children[lpc].name);
1139             pcmk_children[lpc].async_conn = NULL;
1140             pcmk_children[lpc].conn = NULL;
1141             pcmk_children[lpc].pid = 0;
1142         }
1143     }
1144 
1145     send_cluster_id();
1146     ais_notice("Shutdown complete");
1147     /* TODO: Add back the logsys flush call once it's written */
1148 
1149     return 0;
1150 }
1151 
1152 struct member_loop_data {
1153     char *string;
1154 };
1155 
1156 static void
member_vote_count_fn(gpointer key,gpointer value,gpointer user_data)1157 member_vote_count_fn(gpointer key, gpointer value, gpointer user_data)
1158 {
1159     crm_node_t *node = value;
1160 
1161     if (ais_str_eq(CRM_NODE_MEMBER, node->state)) {
1162         plugin_has_votes += node->votes;
1163     }
1164 }
1165 
1166 void
member_loop_fn(gpointer key,gpointer value,gpointer user_data)1167 member_loop_fn(gpointer key, gpointer value, gpointer user_data)
1168 {
1169     crm_node_t *node = value;
1170     struct member_loop_data *data = user_data;
1171 
1172     ais_trace("Dumping node %u", node->id);
1173     data->string = append_member(data->string, node);
1174 }
1175 
1176 char *
pcmk_generate_membership_data(void)1177 pcmk_generate_membership_data(void)
1178 {
1179     int size = 0;
1180     struct member_loop_data data;
1181 
1182     size = 256;
1183     ais_malloc0(data.string, size);
1184 
1185     /* Ensure the list of active processes is up-to-date */
1186     update_member(local_nodeid, 0, 0, -1, get_process_list(), local_uname, CRM_NODE_MEMBER, NULL);
1187 
1188     plugin_has_votes = 0;
1189     g_hash_table_foreach(membership_list, member_vote_count_fn, NULL);
1190     if (plugin_has_votes > plugin_expected_votes) {
1191         update_expected_votes(plugin_has_votes);
1192     }
1193 
1194     snprintf(data.string, size,
1195              "<nodes id=\"" U64T "\" quorate=\"%s\" expected=\"%u\" actual=\"%u\">",
1196              membership_seq, plugin_has_quorum()? "true" : "false",
1197              plugin_expected_votes, plugin_has_votes);
1198 
1199     g_hash_table_foreach(membership_list, member_loop_fn, &data);
1200     size = strlen(data.string);
1201     data.string = realloc_safe(data.string, size + 9);       /* 9 = </nodes> + nul */
1202     sprintf(data.string + size, "</nodes>");
1203     return data.string;
1204 }
1205 
1206 void
pcmk_nodes(void * conn,ais_void_ptr * msg)1207 pcmk_nodes(void *conn, ais_void_ptr * msg)
1208 {
1209     char *data = pcmk_generate_membership_data();
1210     void *async_conn = conn;
1211 
1212     /* send the ACK before we send any other messages
1213      * - but after we no longer need to access the message
1214      */
1215     send_ipc_ack(conn);
1216     msg = NULL;
1217 
1218     if (async_conn) {
1219         send_client_msg(async_conn, crm_class_members, crm_msg_none, data);
1220     }
1221     ais_free(data);
1222 }
1223 
1224 void
pcmk_remove_member(void * conn,ais_void_ptr * msg)1225 pcmk_remove_member(void *conn, ais_void_ptr * msg)
1226 {
1227     const AIS_Message *ais_msg = msg;
1228     char *data = get_ais_data(ais_msg);
1229 
1230     send_ipc_ack(conn);
1231     msg = NULL;
1232 
1233     if (data != NULL) {
1234         char *bcast = ais_concat("remove-peer", data, ':');
1235 
1236         send_plugin_msg(crm_msg_ais, NULL, bcast);
1237         ais_info("Sent: %s", bcast);
1238         ais_free(bcast);
1239     }
1240 
1241     ais_free(data);
1242 }
1243 
1244 static void
send_quorum_details(void * conn)1245 send_quorum_details(void *conn)
1246 {
1247     int size = 256;
1248     char *data = NULL;
1249 
1250     ais_malloc0(data, size);
1251 
1252     snprintf(data, size, "<quorum id=\"" U64T "\" quorate=\"%s\" expected=\"%u\" actual=\"%u\"/>",
1253              membership_seq, plugin_has_quorum()? "true" : "false",
1254              plugin_expected_votes, plugin_has_votes);
1255 
1256     send_client_msg(conn, crm_class_quorum, crm_msg_none, data);
1257     ais_free(data);
1258 }
1259 
1260 void
pcmk_quorum(void * conn,ais_void_ptr * msg)1261 pcmk_quorum(void *conn, ais_void_ptr * msg)
1262 {
1263     char *dummy = NULL;
1264     const AIS_Message *ais_msg = msg;
1265     char *data = get_ais_data(ais_msg);
1266 
1267     send_ipc_ack(conn);
1268     msg = NULL;
1269 
1270     /* Make sure the current number of votes is accurate */
1271     dummy = pcmk_generate_membership_data();
1272     ais_free(dummy);
1273 
1274     /* Calls without data just want the current quorum details */
1275     if (data != NULL && strlen(data) > 0) {
1276         int value = ais_get_int(data, NULL);
1277 
1278         update_expected_votes(value);
1279     }
1280 
1281     send_quorum_details(conn);
1282     ais_free(data);
1283 }
1284 
1285 void
pcmk_notify(void * conn,ais_void_ptr * msg)1286 pcmk_notify(void *conn, ais_void_ptr * msg)
1287 {
1288     const AIS_Message *ais_msg = msg;
1289     char *data = get_ais_data(ais_msg);
1290     void *async_conn = conn;
1291 
1292     int enable = 0;
1293     int sender = ais_msg->sender.pid;
1294 
1295     send_ipc_ack(conn);
1296     msg = NULL;
1297 
1298     if (ais_str_eq("true", data)) {
1299         enable = 1;
1300     }
1301 
1302     ais_info("%s node notifications for child %d (%p)",
1303              enable ? "Enabling" : "Disabling", sender, async_conn);
1304     if (enable) {
1305         g_hash_table_replace(membership_notify_list, async_conn, async_conn);
1306     } else {
1307         g_hash_table_remove(membership_notify_list, async_conn);
1308     }
1309     ais_free(data);
1310 }
1311 
1312 void
pcmk_nodeid(void * conn,ais_void_ptr * msg)1313 pcmk_nodeid(void *conn, ais_void_ptr * msg)
1314 {
1315     static int counter = 0;
1316     struct crm_ais_nodeid_resp_s resp;
1317 
1318     ais_trace("Sending local nodeid: %d to %p[%d]", local_nodeid, conn, counter);
1319 
1320     resp.header.id = crm_class_nodeid;
1321     resp.header.size = sizeof(struct crm_ais_nodeid_resp_s);
1322     resp.header.error = CS_OK;
1323     resp.id = local_nodeid;
1324     resp.counter = counter++;
1325     memset(resp.uname, 0, MAX_NAME);
1326     memcpy(resp.uname, local_uname, local_uname_len);
1327     memset(resp.cname, 0, MAX_NAME);
1328     memcpy(resp.cname, local_cname, local_cname_len);
1329 
1330     pcmk_api->ipc_response_send(conn, &resp, resp.header.size);
1331 }
1332 
1333 static gboolean
ghash_send_update(gpointer key,gpointer value,gpointer data)1334 ghash_send_update(gpointer key, gpointer value, gpointer data)
1335 {
1336     if (send_client_msg(value, crm_class_members, crm_msg_none, data) != 0) {
1337         /* remove it */
1338         return TRUE;
1339     }
1340     return FALSE;
1341 }
1342 
1343 void
send_member_notification(void)1344 send_member_notification(void)
1345 {
1346     char *update = pcmk_generate_membership_data();
1347 
1348     ais_info("Sending membership update " U64T " to %d children",
1349              membership_seq, g_hash_table_size(membership_notify_list));
1350 
1351     g_hash_table_foreach_remove(membership_notify_list, ghash_send_update, update);
1352     ais_free(update);
1353 }
1354 
1355 gboolean
check_message_sanity(const AIS_Message * msg,const char * data)1356 check_message_sanity(const AIS_Message * msg, const char *data)
1357 {
1358     gboolean sane = TRUE;
1359     gboolean repaired = FALSE;
1360     int dest = msg->host.type;
1361     int tmp_size = msg->header.size - sizeof(AIS_Message);
1362 
1363     if (sane && msg->header.size == 0) {
1364         ais_err("Message with no size");
1365         sane = FALSE;
1366     }
1367 
1368     if (sane && msg->header.error != CS_OK) {
1369         ais_err("Message header contains an error: %d", msg->header.error);
1370         sane = FALSE;
1371     }
1372 
1373     AIS_CHECK(msg->header.size > sizeof(AIS_Message),
1374               ais_err("Message %d size too small: %d < %llu",
1375                       msg->header.id, msg->header.size,
1376                       (unsigned long long) sizeof(AIS_Message));
1377               return FALSE);
1378 
1379     if (sane && ais_data_len(msg) != tmp_size) {
1380         ais_warn("Message payload size is incorrect: expected %d, got %d", ais_data_len(msg),
1381                  tmp_size);
1382         sane = TRUE;
1383     }
1384 
1385     if (sane && ais_data_len(msg) == 0) {
1386         ais_err("Message with no payload");
1387         sane = FALSE;
1388     }
1389 
1390     if (sane && data && msg->is_compressed == FALSE) {
1391         int str_size = strlen(data) + 1;
1392 
1393         if (ais_data_len(msg) != str_size) {
1394             int lpc = 0;
1395 
1396             ais_err("Message payload is corrupted: expected %d bytes, got %d",
1397                     ais_data_len(msg), str_size);
1398             sane = FALSE;
1399             for (lpc = (str_size - 10); lpc < msg->size; lpc++) {
1400                 if (lpc < 0) {
1401                     lpc = 0;
1402                 }
1403                 ais_trace("bad_data[%d]: %d / '%c'", lpc, data[lpc], data[lpc]);
1404             }
1405         }
1406     }
1407 
1408     if (sane == FALSE) {
1409         AIS_CHECK(sane,
1410                   ais_err
1411                   ("Invalid message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
1412                    msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
1413                    msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
1414                    ais_data_len(msg), msg->header.size));
1415 
1416     } else if (repaired) {
1417         ais_err
1418             ("Repaired message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
1419              msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
1420              msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
1421              ais_data_len(msg), msg->header.size);
1422     } else {
1423         ais_trace
1424             ("Verified message %d: (dest=%s:%s, from=%s:%s.%d, compressed=%d, size=%d, total=%d)",
1425              msg->id, ais_dest(&(msg->host)), msg_type2text(dest), ais_dest(&(msg->sender)),
1426              msg_type2text(msg->sender.type), msg->sender.pid, msg->is_compressed,
1427              ais_data_len(msg), msg->header.size);
1428     }
1429     return sane;
1430 }
1431 
1432 static int delivered_transient = 0;
1433 static void
deliver_transient_msg(gpointer key,gpointer value,gpointer user_data)1434 deliver_transient_msg(gpointer key, gpointer value, gpointer user_data)
1435 {
1436     int pid = GPOINTER_TO_INT(value);
1437     AIS_Message *mutable = user_data;
1438 
1439     if (pid == mutable->host.type) {
1440         int rc = send_client_ipc(key, mutable);
1441 
1442         delivered_transient++;
1443 
1444         ais_info("Sent message to %s.%d (rc=%d)", ais_dest(&(mutable->host)), pid, rc);
1445         if (rc != 0) {
1446             ais_warn("Sending message to %s.%d failed (rc=%d)",
1447                      ais_dest(&(mutable->host)), pid, rc);
1448             log_ais_message(LOG_DEBUG, mutable);
1449         }
1450     }
1451 }
1452 
1453 gboolean
route_ais_message(const AIS_Message * msg,gboolean local_origin)1454 route_ais_message(const AIS_Message * msg, gboolean local_origin)
1455 {
1456     int rc = 0;
1457     int dest = msg->host.type;
1458     const char *reason = "unknown";
1459     AIS_Message *mutable = ais_msg_copy(msg);
1460     static int service_id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0);
1461 
1462     ais_trace("Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d)",
1463               mutable->id, ais_dest(&(mutable->host)), msg_type2text(dest),
1464               ais_dest(&(mutable->sender)), msg_type2text(mutable->sender.type),
1465               mutable->sender.pid, local_origin ? "false" : "true", ais_data_len((mutable)));
1466 
1467     if (local_origin == FALSE) {
1468         if (mutable->host.size == 0 || ais_str_eq(local_uname, mutable->host.uname)) {
1469             mutable->host.local = TRUE;
1470         }
1471     }
1472 
1473     if (check_message_sanity(mutable, mutable->data) == FALSE) {
1474         /* Don't send this message to anyone */
1475         rc = 1;
1476         goto bail;
1477     }
1478 
1479     if (mutable->host.local) {
1480         void *conn = NULL;
1481         const char *lookup = NULL;
1482         int children_index = 0;
1483 
1484         if (dest == crm_msg_ais) {
1485             process_ais_message(mutable);
1486             goto bail;
1487 
1488         } else if (dest == crm_msg_lrmd) {
1489             /* lrmd messages are routed via the crm */
1490             dest = crm_msg_crmd;
1491 
1492         } else if (dest == crm_msg_te) {
1493             /* te messages are routed via the crm */
1494             dest = crm_msg_crmd;
1495 
1496         } else if (dest >= SIZEOF(pcmk_children)) {
1497             /* Transient client */
1498 
1499             delivered_transient = 0;
1500             g_hash_table_foreach(ipc_client_list, deliver_transient_msg, mutable);
1501             if (delivered_transient) {
1502                 ais_trace("Sent message to %d transient clients: %d", delivered_transient, dest);
1503                 goto bail;
1504 
1505             } else {
1506                 /* try the crmd */
1507                 ais_trace("Sending message to transient client %d via crmd", dest);
1508                 dest = crm_msg_crmd;
1509             }
1510 
1511         } else if (dest == 0) {
1512             ais_err("Invalid destination: %d", dest);
1513             log_ais_message(LOG_ERR, mutable);
1514             log_printf(LOG_ERR, "%s", get_ais_data(mutable));
1515             rc = 1;
1516             goto bail;
1517         }
1518 
1519         lookup = msg_type2text(dest);
1520 
1521         if (dest == crm_msg_pe && ais_str_eq(pcmk_children[7].name, lookup)) {
1522             children_index = 7;
1523 
1524         } else {
1525             children_index = dest;
1526         }
1527 
1528         conn = pcmk_children[children_index].async_conn;
1529 
1530         if (mutable->header.id == service_id) {
1531             mutable->header.id = 0;     /* reset this back to zero for IPC messages */
1532 
1533         } else if (mutable->header.id != 0) {
1534             ais_err("reset header id back to zero from %d", mutable->header.id);
1535             mutable->header.id = 0;     /* reset this back to zero for IPC messages */
1536         }
1537 
1538         reason = "ipc delivery failed";
1539         rc = send_client_ipc(conn, mutable);
1540 
1541     } else if (local_origin) {
1542         /* forward to other hosts */
1543         ais_trace("Forwarding to cluster");
1544         reason = "cluster delivery failed";
1545         rc = send_plugin_msg_raw(mutable);
1546     }
1547 
1548     if (rc != 0) {
1549         ais_warn("Sending message to %s.%s failed: %s (rc=%d)",
1550                  ais_dest(&(mutable->host)), msg_type2text(dest), reason, rc);
1551         log_ais_message(LOG_DEBUG, mutable);
1552     }
1553 
1554   bail:
1555     ais_free(mutable);
1556     return rc == 0 ? TRUE : FALSE;
1557 }
1558 
1559 int
send_plugin_msg_raw(const AIS_Message * ais_msg)1560 send_plugin_msg_raw(const AIS_Message * ais_msg)
1561 {
1562     int rc = 0;
1563     struct iovec iovec;
1564     static uint32_t msg_id = 0;
1565     AIS_Message *mutable = ais_msg_copy(ais_msg);
1566 
1567     AIS_ASSERT(local_nodeid != 0);
1568     AIS_ASSERT(ais_msg->header.size == (sizeof(AIS_Message) + ais_data_len(ais_msg)));
1569 
1570     if (mutable->id == 0) {
1571         msg_id++;
1572         AIS_CHECK(msg_id != 0 /* detect wrap-around */ ,
1573                   msg_id++; ais_err("Message ID wrapped around"));
1574         mutable->id = msg_id;
1575     }
1576 
1577     mutable->header.error = CS_OK;
1578     mutable->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 0);
1579 
1580     mutable->sender.id = local_nodeid;
1581     mutable->sender.size = local_uname_len;
1582     memset(mutable->sender.uname, 0, MAX_NAME);
1583     memcpy(mutable->sender.uname, local_uname, mutable->sender.size);
1584 
1585     iovec.iov_base = (char *)mutable;
1586     iovec.iov_len = mutable->header.size;
1587 
1588     ais_trace("Sending message (size=%u)", (unsigned int)iovec.iov_len);
1589     rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE);
1590 
1591     if (rc == 0 && mutable->is_compressed == FALSE) {
1592         ais_trace("Message sent: %.80s", mutable->data);
1593     }
1594 
1595     AIS_CHECK(rc == 0, ais_err("Message not sent (%d): %.120s", rc, mutable->data));
1596 
1597     ais_free(mutable);
1598     return rc;
1599 }
1600 
1601 #define min(x,y) (x)<(y)?(x):(y)
1602 
1603 void
send_cluster_id(void)1604 send_cluster_id(void)
1605 {
1606     int rc = 0;
1607     int len = 0;
1608     time_t now = time(NULL);
1609     struct iovec iovec;
1610     struct crm_identify_msg_s *msg = NULL;
1611 
1612     static time_t started = 0;
1613     static uint64_t first_seq = 0;
1614 
1615     AIS_ASSERT(local_nodeid != 0);
1616 
1617     if (started == 0) {
1618         started = now;
1619         first_seq = membership_seq;
1620     }
1621 
1622     if (local_born_on == 0) {
1623         if (started + 15 < now) {
1624             ais_debug("Born-on set to: " U64T " (age)", first_seq);
1625             local_born_on = first_seq;
1626 
1627         } else if (have_reliable_membership_id) {
1628             ais_debug("Born-on set to: " U64T " (peer)", membership_seq);
1629             local_born_on = membership_seq;
1630 
1631         } else {
1632             ais_debug("Leaving born-on unset: " U64T, membership_seq);
1633         }
1634     }
1635 
1636     ais_malloc0(msg, sizeof(struct crm_identify_msg_s));
1637     msg->header.size = sizeof(struct crm_identify_msg_s);
1638 
1639     msg->id = local_nodeid;
1640     /* msg->header.error = CS_OK; */
1641     msg->header.id = SERVICE_ID_MAKE(PCMK_SERVICE_ID, 1);
1642 
1643     len = min(local_uname_len, MAX_NAME - 1);
1644     memset(msg->uname, 0, MAX_NAME);
1645     memcpy(msg->uname, local_uname, len);
1646 
1647     len = min(strlen(VERSION), MAX_NAME - 1);
1648     memset(msg->version, 0, MAX_NAME);
1649     memcpy(msg->version, VERSION, len);
1650 
1651     msg->votes = 1;
1652     msg->pid = getpid();
1653     msg->processes = get_process_list();
1654     msg->born_on = local_born_on;
1655 
1656     ais_debug("Local update: id=%u, born=" U64T ", seq=" U64T "",
1657               local_nodeid, local_born_on, membership_seq);
1658     update_member(local_nodeid, local_born_on, membership_seq, msg->votes, msg->processes, NULL,
1659                   NULL, VERSION);
1660 
1661     iovec.iov_base = (char *)msg;
1662     iovec.iov_len = msg->header.size;
1663 
1664     rc = pcmk_api->totem_mcast(&iovec, 1, TOTEMPG_SAFE);
1665 
1666     AIS_CHECK(rc == 0, ais_err("Message not sent (%d)", rc));
1667 
1668     ais_free(msg);
1669 }
1670 
1671 static gboolean
ghash_send_removal(gpointer key,gpointer value,gpointer data)1672 ghash_send_removal(gpointer key, gpointer value, gpointer data)
1673 {
1674     send_quorum_details(value);
1675     if (send_client_msg(value, crm_class_rmpeer, crm_msg_none, data) != 0) {
1676         /* remove it */
1677         return TRUE;
1678     }
1679     return FALSE;
1680 }
1681 
1682 void
ais_remove_peer(char * node_id)1683 ais_remove_peer(char *node_id)
1684 {
1685     uint32_t id = ais_get_int(node_id, NULL);
1686     crm_node_t *node = g_hash_table_lookup(membership_list, GUINT_TO_POINTER(id));
1687 
1688     if (node == NULL) {
1689         ais_info("Peer %u is unknown", id);
1690 
1691     } else if (ais_str_eq(CRM_NODE_MEMBER, node->state)) {
1692         ais_warn("Peer %u/%s is still active", id, node->uname);
1693 
1694     } else if (g_hash_table_remove(membership_list, GUINT_TO_POINTER(id))) {
1695         plugin_expected_votes--;
1696         ais_notice("Removed dead peer %u from the membership list", id);
1697         ais_info("Sending removal of %u to %d children",
1698                  id, g_hash_table_size(membership_notify_list));
1699 
1700         g_hash_table_foreach_remove(membership_notify_list, ghash_send_removal, node_id);
1701 
1702     } else {
1703         ais_warn("Peer %u/%s was not removed", id, node->uname);
1704     }
1705 }
1706 
1707 void
ais_remove_peer_by_name(const char * node_name)1708 ais_remove_peer_by_name(const char *node_name)
1709 {
1710     GHashTableIter iter;
1711     gpointer key = 0;
1712     crm_node_t *node = NULL;
1713     GList *node_list = NULL;
1714 
1715     g_hash_table_iter_init(&iter, membership_list);
1716 
1717     while (g_hash_table_iter_next(&iter, &key, (void **)&node)) {
1718         if (ais_str_eq(node_name, node->uname)) {
1719             uint32_t node_id = GPOINTER_TO_UINT(key);
1720             char *node_id_s = NULL;
1721 
1722             ais_malloc0(node_id_s, 32);
1723             snprintf(node_id_s, 31, "%u", node_id);
1724             node_list = g_list_append(node_list, node_id_s);
1725         }
1726     }
1727 
1728     if (node_list) {
1729         GList *gIter = NULL;
1730 
1731         for (gIter = node_list; gIter != NULL; gIter = gIter->next) {
1732             char *node_id_s = gIter->data;
1733 
1734             ais_remove_peer(node_id_s);
1735         }
1736         g_list_free_full(node_list, free);
1737 
1738     } else {
1739         ais_warn("Peer %s is unkown", node_name);
1740     }
1741 }
1742 
1743 gboolean
process_ais_message(const AIS_Message * msg)1744 process_ais_message(const AIS_Message * msg)
1745 {
1746     int len = ais_data_len(msg);
1747     char *data = get_ais_data(msg);
1748 
1749     do_ais_log(LOG_DEBUG,
1750                "Msg[%d] (dest=%s:%s, from=%s:%s.%d, remote=%s, size=%d): %.90s",
1751                msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
1752                ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
1753                msg->sender.pid,
1754                msg->sender.uname == local_uname ? "false" : "true", ais_data_len(msg), data);
1755 
1756     if (data && len > 12 && strncmp("remove-peer:", data, 12) == 0) {
1757         char *node = data + 12;
1758 
1759         ais_remove_peer_by_name(node);
1760     }
1761 
1762     ais_free(data);
1763     return TRUE;
1764 }
1765 
1766 static void
member_dump_fn(gpointer key,gpointer value,gpointer user_data)1767 member_dump_fn(gpointer key, gpointer value, gpointer user_data)
1768 {
1769     crm_node_t *node = value;
1770 
1771     ais_info(" node id:%u, uname=%s state=%s processes=%.16x born=" U64T " seen=" U64T
1772              " addr=%s version=%s", node->id, node->uname ? node->uname : "-unknown-", node->state,
1773              node->processes, node->born, node->last_seen, node->addr ? node->addr : "-unknown-",
1774              node->version ? node->version : "-unknown-");
1775 }
1776 
1777 void
pcmk_exec_dump(void)1778 pcmk_exec_dump(void)
1779 {
1780     /* Called after SIG_USR2 */
1781     process_ais_conf();
1782     ais_info("Local id: %u, uname: %s, born: " U64T, local_nodeid, local_uname, local_born_on);
1783     ais_info("Membership id: " U64T ", quorate: %s, expected: %u, actual: %u",
1784              membership_seq, plugin_has_quorum()? "true" : "false",
1785              plugin_expected_votes, plugin_has_votes);
1786 
1787     g_hash_table_foreach(membership_list, member_dump_fn, NULL);
1788 }
1789