1 /*	$NetBSD: clvmd-corosync.c,v 1.1.1.2 2009/12/02 00:27:02 haad Exp $	*/
2 
3 /*
4  * Copyright (C) 2009 Red Hat, Inc. All rights reserved.
5  *
6  * This file is part of LVM2.
7  *
8  * This copyrighted material is made available to anyone wishing to use,
9  * modify, copy, or redistribute it subject to the terms and conditions
10  * of the GNU Lesser General Public License v.2.1.
11  *
12  * You should have received a copy of the GNU Lesser General Public License
13  * along with this program; if not, write to the Free Software Foundation,
14  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15  */
16 
17 /*
18  * This provides the interface between clvmd and corosync/DLM as the cluster
19  * and lock manager.
20  */
21 
22 #define _GNU_SOURCE
23 #define _FILE_OFFSET_BITS 64
24 
25 #include <configure.h>
26 #include <pthread.h>
27 #include <sys/types.h>
28 #include <sys/utsname.h>
29 #include <sys/ioctl.h>
30 #include <sys/socket.h>
31 #include <sys/stat.h>
32 #include <sys/file.h>
33 #include <sys/socket.h>
34 #include <netinet/in.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <stdint.h>
38 #include <signal.h>
39 #include <fcntl.h>
40 #include <string.h>
41 #include <stddef.h>
42 #include <stdint.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <utmpx.h>
46 #include <syslog.h>
47 #include <assert.h>
48 #include <libdevmapper.h>
49 
50 #include <corosync/corotypes.h>
51 #include <corosync/cpg.h>
52 #include <corosync/quorum.h>
53 #include <corosync/confdb.h>
54 #include <libdlm.h>
55 
56 #include "locking.h"
57 #include "lvm-logging.h"
58 #include "clvm.h"
59 #include "clvmd-comms.h"
60 #include "lvm-functions.h"
61 #include "clvmd.h"
62 
63 /* Timeout value for several corosync calls */
64 #define LOCKSPACE_NAME "clvmd"
65 
66 static void corosync_cpg_deliver_callback (cpg_handle_t handle,
67 				  const struct cpg_name *groupName,
68 				  uint32_t nodeid,
69 				  uint32_t pid,
70 				  void *msg,
71 				  size_t msg_len);
72 static void corosync_cpg_confchg_callback(cpg_handle_t handle,
73 				 const struct cpg_name *groupName,
74 				 const struct cpg_address *member_list, size_t member_list_entries,
75 				 const struct cpg_address *left_list, size_t left_list_entries,
76 				 const struct cpg_address *joined_list, size_t joined_list_entries);
77 static void _cluster_closedown(void);
78 
79 /* Hash list of nodes in the cluster */
80 static struct dm_hash_table *node_hash;
81 
82 /* Number of active nodes */
83 static int num_nodes;
84 static unsigned int our_nodeid;
85 
86 static struct local_client *cluster_client;
87 
88 /* Corosync handles */
89 static cpg_handle_t cpg_handle;
90 static quorum_handle_t quorum_handle;
91 
92 /* DLM Handle */
93 static dlm_lshandle_t *lockspace;
94 
95 static struct cpg_name cpg_group_name;
96 
97 /* Corosync callback structs */
98 cpg_callbacks_t corosync_cpg_callbacks = {
99 	.cpg_deliver_fn =            corosync_cpg_deliver_callback,
100 	.cpg_confchg_fn =            corosync_cpg_confchg_callback,
101 };
102 
103 quorum_callbacks_t quorum_callbacks = {
104 	.quorum_notify_fn = NULL,
105 };
106 
107 struct node_info
108 {
109 	enum {NODE_UNKNOWN, NODE_DOWN, NODE_UP, NODE_CLVMD} state;
110 	int nodeid;
111 };
112 
113 
114 /* Set errno to something approximating the right value and return 0 or -1 */
115 static int cs_to_errno(cs_error_t err)
116 {
117 	switch(err)
118 	{
119 	case CS_OK:
120 		return 0;
121         case CS_ERR_LIBRARY:
122 		errno = EINVAL;
123 		break;
124         case CS_ERR_VERSION:
125 		errno = EINVAL;
126 		break;
127         case CS_ERR_INIT:
128 		errno = EINVAL;
129 		break;
130         case CS_ERR_TIMEOUT:
131 		errno = ETIME;
132 		break;
133         case CS_ERR_TRY_AGAIN:
134 		errno = EAGAIN;
135 		break;
136         case CS_ERR_INVALID_PARAM:
137 		errno = EINVAL;
138 		break;
139         case CS_ERR_NO_MEMORY:
140 		errno = ENOMEM;
141 		break;
142         case CS_ERR_BAD_HANDLE:
143 		errno = EINVAL;
144 		break;
145         case CS_ERR_BUSY:
146 		errno = EBUSY;
147 		break;
148         case CS_ERR_ACCESS:
149 		errno = EPERM;
150 		break;
151         case CS_ERR_NOT_EXIST:
152 		errno = ENOENT;
153 		break;
154         case CS_ERR_NAME_TOO_LONG:
155 		errno = ENAMETOOLONG;
156 		break;
157         case CS_ERR_EXIST:
158 		errno = EEXIST;
159 		break;
160         case CS_ERR_NO_SPACE:
161 		errno = ENOSPC;
162 		break;
163         case CS_ERR_INTERRUPT:
164 		errno = EINTR;
165 		break;
166 	case CS_ERR_NAME_NOT_FOUND:
167 		errno = ENOENT;
168 		break;
169         case CS_ERR_NO_RESOURCES:
170 		errno = ENOMEM;
171 		break;
172         case CS_ERR_NOT_SUPPORTED:
173 		errno = EOPNOTSUPP;
174 		break;
175         case CS_ERR_BAD_OPERATION:
176 		errno = EINVAL;
177 		break;
178         case CS_ERR_FAILED_OPERATION:
179 		errno = EIO;
180 		break;
181         case CS_ERR_MESSAGE_ERROR:
182 		errno = EIO;
183 		break;
184         case CS_ERR_QUEUE_FULL:
185 		errno = EXFULL;
186 		break;
187         case CS_ERR_QUEUE_NOT_AVAILABLE:
188 		errno = EINVAL;
189 		break;
190         case CS_ERR_BAD_FLAGS:
191 		errno = EINVAL;
192 		break;
193         case CS_ERR_TOO_BIG:
194 		errno = E2BIG;
195 		break;
196         case CS_ERR_NO_SECTIONS:
197 		errno = ENOMEM;
198 		break;
199 	default:
200 		errno = EINVAL;
201 		break;
202 	}
203 	return -1;
204 }
205 
206 static char *print_corosync_csid(const char *csid)
207 {
208 	static char buf[128];
209 	int id;
210 
211 	memcpy(&id, csid, sizeof(int));
212 	sprintf(buf, "%d", id);
213 	return buf;
214 }
215 
216 static void corosync_cpg_deliver_callback (cpg_handle_t handle,
217 				  const struct cpg_name *groupName,
218 				  uint32_t nodeid,
219 				  uint32_t pid,
220 				  void *msg,
221 				  size_t msg_len)
222 {
223 	int target_nodeid;
224 
225 	memcpy(&target_nodeid, msg, COROSYNC_CSID_LEN);
226 
227 	DEBUGLOG("%u got message from nodeid %d for %d. len %zd\n",
228 		 our_nodeid, nodeid, target_nodeid, msg_len-4);
229 
230 	if (nodeid != our_nodeid)
231 		if (target_nodeid == our_nodeid || target_nodeid == 0)
232 			process_message(cluster_client, (char *)msg+COROSYNC_CSID_LEN,
233 					msg_len-COROSYNC_CSID_LEN, (char*)&nodeid);
234 }
235 
236 static void corosync_cpg_confchg_callback(cpg_handle_t handle,
237 				 const struct cpg_name *groupName,
238 				 const struct cpg_address *member_list, size_t member_list_entries,
239 				 const struct cpg_address *left_list, size_t left_list_entries,
240 				 const struct cpg_address *joined_list, size_t joined_list_entries)
241 {
242 	int i;
243 	struct node_info *ninfo;
244 
245 	DEBUGLOG("confchg callback. %zd joined, %zd left, %zd members\n",
246 		 joined_list_entries, left_list_entries, member_list_entries);
247 
248 	for (i=0; i<joined_list_entries; i++) {
249 		ninfo = dm_hash_lookup_binary(node_hash,
250 					      (char *)&joined_list[i].nodeid,
251 					      COROSYNC_CSID_LEN);
252 		if (!ninfo) {
253 			ninfo = malloc(sizeof(struct node_info));
254 			if (!ninfo) {
255 				break;
256 			}
257 			else {
258 				ninfo->nodeid = joined_list[i].nodeid;
259 				dm_hash_insert_binary(node_hash,
260 						      (char *)&ninfo->nodeid,
261 						      COROSYNC_CSID_LEN, ninfo);
262 			}
263 		}
264 		ninfo->state = NODE_CLVMD;
265 	}
266 
267 	for (i=0; i<left_list_entries; i++) {
268 		ninfo = dm_hash_lookup_binary(node_hash,
269 					      (char *)&left_list[i].nodeid,
270 					      COROSYNC_CSID_LEN);
271 		if (ninfo)
272 			ninfo->state = NODE_DOWN;
273 	}
274 
275 	for (i=0; i<member_list_entries; i++) {
276 		if (member_list[i].nodeid == 0) continue;
277 		ninfo = dm_hash_lookup_binary(node_hash,
278 				(char *)&member_list[i].nodeid,
279 				COROSYNC_CSID_LEN);
280 		if (!ninfo) {
281 			ninfo = malloc(sizeof(struct node_info));
282 			if (!ninfo) {
283 				break;
284 			}
285 			else {
286 				ninfo->nodeid = member_list[i].nodeid;
287 				dm_hash_insert_binary(node_hash,
288 						(char *)&ninfo->nodeid,
289 						COROSYNC_CSID_LEN, ninfo);
290 			}
291 		}
292 		ninfo->state = NODE_CLVMD;
293 	}
294 
295 	num_nodes = member_list_entries;
296 }
297 
298 static int _init_cluster(void)
299 {
300 	cs_error_t err;
301 
302 	node_hash = dm_hash_create(100);
303 
304 	err = cpg_initialize(&cpg_handle,
305 			     &corosync_cpg_callbacks);
306 	if (err != CS_OK) {
307 		syslog(LOG_ERR, "Cannot initialise Corosync CPG service: %d",
308 		       err);
309 		DEBUGLOG("Cannot initialise Corosync CPG service: %d", err);
310 		return cs_to_errno(err);
311 	}
312 
313 	err = quorum_initialize(&quorum_handle,
314 				&quorum_callbacks);
315 	if (err != CS_OK) {
316 		syslog(LOG_ERR, "Cannot initialise Corosync quorum service: %d",
317 		       err);
318 		DEBUGLOG("Cannot initialise Corosync quorum service: %d", err);
319 		return cs_to_errno(err);
320 	}
321 
322 
323 	/* Create a lockspace for LV & VG locks to live in */
324 	lockspace = dlm_create_lockspace(LOCKSPACE_NAME, 0600);
325 	if (!lockspace) {
326 		if (errno == EEXIST) {
327 			lockspace = dlm_open_lockspace(LOCKSPACE_NAME);
328 		}
329 		if (!lockspace) {
330 			syslog(LOG_ERR, "Unable to create lockspace for CLVM: %m");
331 			quorum_finalize(quorum_handle);
332 			return -1;
333 		}
334 	}
335 	dlm_ls_pthread_init(lockspace);
336 	DEBUGLOG("DLM initialisation complete\n");
337 
338 	/* Connect to the clvmd group */
339 	strcpy((char *)cpg_group_name.value, "clvmd");
340 	cpg_group_name.length = strlen((char *)cpg_group_name.value);
341 	err = cpg_join(cpg_handle, &cpg_group_name);
342 	if (err != CS_OK) {
343 		cpg_finalize(cpg_handle);
344 		quorum_finalize(quorum_handle);
345 		dlm_release_lockspace(LOCKSPACE_NAME, lockspace, 1);
346 		syslog(LOG_ERR, "Cannot join clvmd process group");
347 		DEBUGLOG("Cannot join clvmd process group: %d\n", err);
348 		return cs_to_errno(err);
349 	}
350 
351 	err = cpg_local_get(cpg_handle,
352 			    &our_nodeid);
353 	if (err != CS_OK) {
354 		cpg_finalize(cpg_handle);
355 		quorum_finalize(quorum_handle);
356 		dlm_release_lockspace(LOCKSPACE_NAME, lockspace, 1);
357 		syslog(LOG_ERR, "Cannot get local node id\n");
358 		return cs_to_errno(err);
359 	}
360 	DEBUGLOG("Our local node id is %d\n", our_nodeid);
361 
362 	DEBUGLOG("Connected to Corosync\n");
363 
364 	return 0;
365 }
366 
367 static void _cluster_closedown(void)
368 {
369 	DEBUGLOG("cluster_closedown\n");
370 	destroy_lvhash();
371 
372 	dlm_release_lockspace(LOCKSPACE_NAME, lockspace, 1);
373 	cpg_finalize(cpg_handle);
374 	quorum_finalize(quorum_handle);
375 }
376 
377 static void _get_our_csid(char *csid)
378 {
379 	memcpy(csid, &our_nodeid, sizeof(int));
380 }
381 
382 /* Corosync doesn't really have nmode names so we
383    just use the node ID in hex instead */
384 static int _csid_from_name(char *csid, const char *name)
385 {
386 	int nodeid;
387 	struct node_info *ninfo;
388 
389 	if (sscanf(name, "%x", &nodeid) == 1) {
390 		ninfo = dm_hash_lookup_binary(node_hash, csid, COROSYNC_CSID_LEN);
391 		if (ninfo)
392 			return nodeid;
393 	}
394 	return -1;
395 }
396 
397 static int _name_from_csid(const char *csid, char *name)
398 {
399 	struct node_info *ninfo;
400 
401 	ninfo = dm_hash_lookup_binary(node_hash, csid, COROSYNC_CSID_LEN);
402 	if (!ninfo)
403 	{
404 		sprintf(name, "UNKNOWN %s", print_corosync_csid(csid));
405 		return -1;
406 	}
407 
408 	sprintf(name, "%x", ninfo->nodeid);
409 	return 0;
410 }
411 
412 static int _get_num_nodes()
413 {
414 	DEBUGLOG("num_nodes = %d\n", num_nodes);
415 	return num_nodes;
416 }
417 
418 /* Node is now known to be running a clvmd */
419 static void _add_up_node(const char *csid)
420 {
421 	struct node_info *ninfo;
422 
423 	ninfo = dm_hash_lookup_binary(node_hash, csid, COROSYNC_CSID_LEN);
424 	if (!ninfo) {
425 		DEBUGLOG("corosync_add_up_node no node_hash entry for csid %s\n",
426 			 print_corosync_csid(csid));
427 		return;
428 	}
429 
430 	DEBUGLOG("corosync_add_up_node %d\n", ninfo->nodeid);
431 
432 	ninfo->state = NODE_CLVMD;
433 
434 	return;
435 }
436 
437 /* Call a callback for each node, so the caller knows whether it's up or down */
438 static int _cluster_do_node_callback(struct local_client *master_client,
439 				     void (*callback)(struct local_client *,
440 						      const char *csid, int node_up))
441 {
442 	struct dm_hash_node *hn;
443 	struct node_info *ninfo;
444 	int somedown = 0;
445 
446 	dm_hash_iterate(hn, node_hash)
447 	{
448 		char csid[COROSYNC_CSID_LEN];
449 
450 		ninfo = dm_hash_get_data(node_hash, hn);
451 		memcpy(csid, dm_hash_get_key(node_hash, hn), COROSYNC_CSID_LEN);
452 
453 		DEBUGLOG("down_callback. node %d, state = %d\n", ninfo->nodeid,
454 			 ninfo->state);
455 
456 		if (ninfo->state != NODE_DOWN)
457 			callback(master_client, csid, ninfo->state == NODE_CLVMD);
458 		if (ninfo->state != NODE_CLVMD)
459 			somedown = -1;
460 	}
461 	return somedown;
462 }
463 
464 /* Real locking */
465 static int _lock_resource(const char *resource, int mode, int flags, int *lockid)
466 {
467 	struct dlm_lksb lksb;
468 	int err;
469 
470 	DEBUGLOG("lock_resource '%s', flags=%d, mode=%d\n", resource, flags, mode);
471 
472 	if (flags & LKF_CONVERT)
473 		lksb.sb_lkid = *lockid;
474 
475 	err = dlm_ls_lock_wait(lockspace,
476 			       mode,
477 			       &lksb,
478 			       flags,
479 			       resource,
480 			       strlen(resource),
481 			       0,
482 			       NULL, NULL, NULL);
483 
484 	if (err != 0)
485 	{
486 		DEBUGLOG("dlm_ls_lock returned %d\n", errno);
487 		return err;
488 	}
489 	if (lksb.sb_status != 0)
490 	{
491 		DEBUGLOG("dlm_ls_lock returns lksb.sb_status %d\n", lksb.sb_status);
492 		errno = lksb.sb_status;
493 		return -1;
494 	}
495 
496 	DEBUGLOG("lock_resource returning %d, lock_id=%x\n", err, lksb.sb_lkid);
497 
498 	*lockid = lksb.sb_lkid;
499 
500 	return 0;
501 }
502 
503 
504 static int _unlock_resource(const char *resource, int lockid)
505 {
506 	struct dlm_lksb lksb;
507 	int err;
508 
509 	DEBUGLOG("unlock_resource: %s lockid: %x\n", resource, lockid);
510 	lksb.sb_lkid = lockid;
511 
512 	err = dlm_ls_unlock_wait(lockspace,
513 				 lockid,
514 				 0,
515 				 &lksb);
516 	if (err != 0)
517 	{
518 		DEBUGLOG("Unlock returned %d\n", err);
519 		return err;
520 	}
521 	if (lksb.sb_status != EUNLOCK)
522 	{
523 		DEBUGLOG("dlm_ls_unlock_wait returns lksb.sb_status: %d\n", lksb.sb_status);
524 		errno = lksb.sb_status;
525 		return -1;
526 	}
527 
528 
529 	return 0;
530 }
531 
532 static int _is_quorate()
533 {
534 	int quorate;
535 	if (quorum_getquorate(quorum_handle, &quorate) == CS_OK)
536 		return quorate;
537 	else
538 		return 0;
539 }
540 
541 static int _get_main_cluster_fd(void)
542 {
543 	int select_fd;
544 
545 	cpg_fd_get(cpg_handle, &select_fd);
546 	return select_fd;
547 }
548 
549 static int _cluster_fd_callback(struct local_client *fd, char *buf, int len,
550 				const char *csid,
551 				struct local_client **new_client)
552 {
553 	cluster_client = fd;
554 	*new_client = NULL;
555 	cpg_dispatch(cpg_handle, CS_DISPATCH_ONE);
556 	return 1;
557 }
558 
559 static int _cluster_send_message(const void *buf, int msglen, const char *csid,
560 				 const char *errtext)
561 {
562 	struct iovec iov[2];
563 	cs_error_t err;
564 	int target_node;
565 
566 	if (csid)
567 		memcpy(&target_node, csid, COROSYNC_CSID_LEN);
568 	else
569 		target_node = 0;
570 
571 	iov[0].iov_base = &target_node;
572 	iov[0].iov_len = sizeof(int);
573 	iov[1].iov_base = (char *)buf;
574 	iov[1].iov_len = msglen;
575 
576 	err = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 2);
577 	return cs_to_errno(err);
578 }
579 
580 /*
581  * We are not necessarily connected to a Red Hat Cluster system,
582  * but if we are, this returns the cluster name from cluster.conf.
583  * I've used confdb rather than ccs to reduce the inter-package
584  * dependancies as well as to allow people to set a cluster name
585  * for themselves even if they are not running on RH cluster.
586  */
587 static int _get_cluster_name(char *buf, int buflen)
588 {
589 	confdb_handle_t handle;
590 	int result;
591 	size_t namelen = buflen;
592 	hdb_handle_t cluster_handle;
593 	confdb_callbacks_t callbacks = {
594 		.confdb_key_change_notify_fn = NULL,
595 		.confdb_object_create_change_notify_fn = NULL,
596 		.confdb_object_delete_change_notify_fn = NULL
597 	};
598 
599 	/* This is a default in case everything else fails */
600 	strncpy(buf, "Corosync", buflen);
601 
602 	/* Look for a cluster name in confdb */
603 	result = confdb_initialize (&handle, &callbacks);
604         if (result != CS_OK)
605 		return 0;
606 
607         result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
608 	if (result != CS_OK)
609 		goto out;
610 
611         result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
612         if (result != CS_OK)
613 		goto out;
614 
615         result = confdb_key_get(handle, cluster_handle, (void *)"name", strlen("name"), buf, &namelen);
616         if (result != CS_OK)
617 		goto out;
618 
619 	buf[namelen] = '\0';
620 
621 out:
622 	confdb_finalize(handle);
623 	return 0;
624 }
625 
626 static struct cluster_ops _cluster_corosync_ops = {
627 	.cluster_init_completed   = NULL,
628 	.cluster_send_message     = _cluster_send_message,
629 	.name_from_csid           = _name_from_csid,
630 	.csid_from_name           = _csid_from_name,
631 	.get_num_nodes            = _get_num_nodes,
632 	.cluster_fd_callback      = _cluster_fd_callback,
633 	.get_main_cluster_fd      = _get_main_cluster_fd,
634 	.cluster_do_node_callback = _cluster_do_node_callback,
635 	.is_quorate               = _is_quorate,
636 	.get_our_csid             = _get_our_csid,
637 	.add_up_node              = _add_up_node,
638 	.reread_config            = NULL,
639 	.cluster_closedown        = _cluster_closedown,
640 	.get_cluster_name         = _get_cluster_name,
641 	.sync_lock                = _lock_resource,
642 	.sync_unlock              = _unlock_resource,
643 };
644 
645 struct cluster_ops *init_corosync_cluster(void)
646 {
647 	if (!_init_cluster())
648 		return &_cluster_corosync_ops;
649 	else
650 		return NULL;
651 }
652