1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int ls_count;
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35 ssize_t ret = len;
36 int n;
37 int rc = kstrtoint(buf, 0, &n);
38
39 if (rc)
40 return rc;
41 ls = dlm_find_lockspace_local(ls->ls_local_handle);
42 if (!ls)
43 return -EINVAL;
44
45 switch (n) {
46 case 0:
47 dlm_ls_stop(ls);
48 break;
49 case 1:
50 dlm_ls_start(ls);
51 break;
52 default:
53 ret = -EINVAL;
54 }
55 dlm_put_lockspace(ls);
56 return ret;
57 }
58
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62
63 if (rc)
64 return rc;
65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 wake_up(&ls->ls_uevent_wait);
67 return len;
68 }
69
dlm_id_show(struct dlm_ls * ls,char * buf)70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78
79 if (rc)
80 return rc;
81 return len;
82 }
83
dlm_nodir_show(struct dlm_ls * ls,char * buf)84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88
dlm_nodir_store(struct dlm_ls * ls,const char * buf,size_t len)89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91 int val;
92 int rc = kstrtoint(buf, 0, &val);
93
94 if (rc)
95 return rc;
96 if (val == 1)
97 set_bit(LSFL_NODIR, &ls->ls_flags);
98 return len;
99 }
100
dlm_recover_status_show(struct dlm_ls * ls,char * buf)101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103 uint32_t status = dlm_recover_status(ls);
104 return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111
112 struct dlm_attr {
113 struct attribute attr;
114 ssize_t (*show)(struct dlm_ls *, char *);
115 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117
118 static struct dlm_attr dlm_attr_control = {
119 .attr = {.name = "control", .mode = S_IWUSR},
120 .store = dlm_control_store
121 };
122
123 static struct dlm_attr dlm_attr_event = {
124 .attr = {.name = "event_done", .mode = S_IWUSR},
125 .store = dlm_event_store
126 };
127
128 static struct dlm_attr dlm_attr_id = {
129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 .show = dlm_id_show,
131 .store = dlm_id_store
132 };
133
134 static struct dlm_attr dlm_attr_nodir = {
135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 .show = dlm_nodir_show,
137 .store = dlm_nodir_store
138 };
139
140 static struct dlm_attr dlm_attr_recover_status = {
141 .attr = {.name = "recover_status", .mode = S_IRUGO},
142 .show = dlm_recover_status_show
143 };
144
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
147 .show = dlm_recover_nodeid_show
148 };
149
150 static struct attribute *dlm_attrs[] = {
151 &dlm_attr_control.attr,
152 &dlm_attr_event.attr,
153 &dlm_attr_id.attr,
154 &dlm_attr_nodir.attr,
155 &dlm_attr_recover_status.attr,
156 &dlm_attr_recover_nodeid.attr,
157 NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 char *buf)
163 {
164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 return a->show ? a->show(ls, buf) : 0;
167 }
168
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 const char *buf, size_t len)
171 {
172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 return a->store ? a->store(ls, buf, len) : len;
175 }
176
lockspace_kobj_release(struct kobject * k)177 static void lockspace_kobj_release(struct kobject *k)
178 {
179 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
180 kfree(ls);
181 }
182
183 static const struct sysfs_ops dlm_attr_ops = {
184 .show = dlm_attr_show,
185 .store = dlm_attr_store,
186 };
187
188 static struct kobj_type dlm_ktype = {
189 .default_groups = dlm_groups,
190 .sysfs_ops = &dlm_attr_ops,
191 .release = lockspace_kobj_release,
192 };
193
194 static struct kset *dlm_kset;
195
do_uevent(struct dlm_ls * ls,int in)196 static int do_uevent(struct dlm_ls *ls, int in)
197 {
198 if (in)
199 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
200 else
201 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
202
203 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
204
205 /* dlm_controld will see the uevent, do the necessary group management
206 and then write to sysfs to wake us */
207
208 wait_event(ls->ls_uevent_wait,
209 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
210
211 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
212
213 return ls->ls_uevent_result;
214 }
215
dlm_uevent(const struct kobject * kobj,struct kobj_uevent_env * env)216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
217 {
218 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
219
220 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
221 return 0;
222 }
223
224 static const struct kset_uevent_ops dlm_uevent_ops = {
225 .uevent = dlm_uevent,
226 };
227
dlm_lockspace_init(void)228 int __init dlm_lockspace_init(void)
229 {
230 ls_count = 0;
231 mutex_init(&ls_lock);
232 INIT_LIST_HEAD(&lslist);
233 spin_lock_init(&lslist_lock);
234
235 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
236 if (!dlm_kset) {
237 printk(KERN_WARNING "%s: can not create kset\n", __func__);
238 return -ENOMEM;
239 }
240 return 0;
241 }
242
dlm_lockspace_exit(void)243 void dlm_lockspace_exit(void)
244 {
245 kset_unregister(dlm_kset);
246 }
247
dlm_find_lockspace_global(uint32_t id)248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
249 {
250 struct dlm_ls *ls;
251
252 spin_lock_bh(&lslist_lock);
253
254 list_for_each_entry(ls, &lslist, ls_list) {
255 if (ls->ls_global_id == id) {
256 atomic_inc(&ls->ls_count);
257 goto out;
258 }
259 }
260 ls = NULL;
261 out:
262 spin_unlock_bh(&lslist_lock);
263 return ls;
264 }
265
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
267 {
268 struct dlm_ls *ls;
269
270 spin_lock_bh(&lslist_lock);
271 list_for_each_entry(ls, &lslist, ls_list) {
272 if (ls->ls_local_handle == lockspace) {
273 atomic_inc(&ls->ls_count);
274 goto out;
275 }
276 }
277 ls = NULL;
278 out:
279 spin_unlock_bh(&lslist_lock);
280 return ls;
281 }
282
dlm_find_lockspace_device(int minor)283 struct dlm_ls *dlm_find_lockspace_device(int minor)
284 {
285 struct dlm_ls *ls;
286
287 spin_lock_bh(&lslist_lock);
288 list_for_each_entry(ls, &lslist, ls_list) {
289 if (ls->ls_device.minor == minor) {
290 atomic_inc(&ls->ls_count);
291 goto out;
292 }
293 }
294 ls = NULL;
295 out:
296 spin_unlock_bh(&lslist_lock);
297 return ls;
298 }
299
dlm_put_lockspace(struct dlm_ls * ls)300 void dlm_put_lockspace(struct dlm_ls *ls)
301 {
302 if (atomic_dec_and_test(&ls->ls_count))
303 wake_up(&ls->ls_count_wait);
304 }
305
remove_lockspace(struct dlm_ls * ls)306 static void remove_lockspace(struct dlm_ls *ls)
307 {
308 retry:
309 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
310
311 spin_lock_bh(&lslist_lock);
312 if (atomic_read(&ls->ls_count) != 0) {
313 spin_unlock_bh(&lslist_lock);
314 goto retry;
315 }
316
317 WARN_ON(ls->ls_create_count != 0);
318 list_del(&ls->ls_list);
319 spin_unlock_bh(&lslist_lock);
320 }
321
threads_start(void)322 static int threads_start(void)
323 {
324 int error;
325
326 /* Thread for sending/receiving messages for all lockspace's */
327 error = dlm_midcomms_start();
328 if (error)
329 log_print("cannot start dlm midcomms %d", error);
330
331 return error;
332 }
333
new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)334 static int new_lockspace(const char *name, const char *cluster,
335 uint32_t flags, int lvblen,
336 const struct dlm_lockspace_ops *ops, void *ops_arg,
337 int *ops_result, dlm_lockspace_t **lockspace)
338 {
339 struct dlm_ls *ls;
340 int do_unreg = 0;
341 int namelen = strlen(name);
342 int error;
343
344 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
345 return -EINVAL;
346
347 if (lvblen % 8)
348 return -EINVAL;
349
350 if (!try_module_get(THIS_MODULE))
351 return -EINVAL;
352
353 if (!dlm_user_daemon_available()) {
354 log_print("dlm user daemon not available");
355 error = -EUNATCH;
356 goto out;
357 }
358
359 if (ops && ops_result) {
360 if (!dlm_config.ci_recover_callbacks)
361 *ops_result = -EOPNOTSUPP;
362 else
363 *ops_result = 0;
364 }
365
366 if (!cluster)
367 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
368 dlm_config.ci_cluster_name);
369
370 if (dlm_config.ci_recover_callbacks && cluster &&
371 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
372 log_print("dlm cluster name '%s' does not match "
373 "the application cluster name '%s'",
374 dlm_config.ci_cluster_name, cluster);
375 error = -EBADR;
376 goto out;
377 }
378
379 error = 0;
380
381 spin_lock_bh(&lslist_lock);
382 list_for_each_entry(ls, &lslist, ls_list) {
383 WARN_ON(ls->ls_create_count <= 0);
384 if (ls->ls_namelen != namelen)
385 continue;
386 if (memcmp(ls->ls_name, name, namelen))
387 continue;
388 if (flags & DLM_LSFL_NEWEXCL) {
389 error = -EEXIST;
390 break;
391 }
392 ls->ls_create_count++;
393 *lockspace = ls;
394 error = 1;
395 break;
396 }
397 spin_unlock_bh(&lslist_lock);
398
399 if (error)
400 goto out;
401
402 error = -ENOMEM;
403
404 ls = kzalloc(sizeof(*ls), GFP_NOFS);
405 if (!ls)
406 goto out;
407 memcpy(ls->ls_name, name, namelen);
408 ls->ls_namelen = namelen;
409 ls->ls_lvblen = lvblen;
410 atomic_set(&ls->ls_count, 0);
411 init_waitqueue_head(&ls->ls_count_wait);
412 ls->ls_flags = 0;
413 ls->ls_scan_time = jiffies;
414
415 if (ops && dlm_config.ci_recover_callbacks) {
416 ls->ls_ops = ops;
417 ls->ls_ops_arg = ops_arg;
418 }
419
420 /* ls_exflags are forced to match among nodes, and we don't
421 * need to require all nodes to have some flags set
422 */
423 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
424
425 INIT_LIST_HEAD(&ls->ls_toss);
426 INIT_LIST_HEAD(&ls->ls_keep);
427 rwlock_init(&ls->ls_rsbtbl_lock);
428
429 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
430 if (error)
431 goto out_lsfree;
432
433 idr_init(&ls->ls_lkbidr);
434 rwlock_init(&ls->ls_lkbidr_lock);
435
436 INIT_LIST_HEAD(&ls->ls_waiters);
437 spin_lock_init(&ls->ls_waiters_lock);
438 INIT_LIST_HEAD(&ls->ls_orphans);
439 spin_lock_init(&ls->ls_orphans_lock);
440
441 INIT_LIST_HEAD(&ls->ls_new_rsb);
442 spin_lock_init(&ls->ls_new_rsb_spin);
443
444 INIT_LIST_HEAD(&ls->ls_nodes);
445 INIT_LIST_HEAD(&ls->ls_nodes_gone);
446 ls->ls_num_nodes = 0;
447 ls->ls_low_nodeid = 0;
448 ls->ls_total_weight = 0;
449 ls->ls_node_array = NULL;
450
451 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
452 ls->ls_local_rsb.res_ls = ls;
453
454 ls->ls_debug_rsb_dentry = NULL;
455 ls->ls_debug_waiters_dentry = NULL;
456
457 init_waitqueue_head(&ls->ls_uevent_wait);
458 ls->ls_uevent_result = 0;
459 init_completion(&ls->ls_recovery_done);
460 ls->ls_recovery_result = -1;
461
462 spin_lock_init(&ls->ls_cb_lock);
463 INIT_LIST_HEAD(&ls->ls_cb_delay);
464
465 ls->ls_recoverd_task = NULL;
466 mutex_init(&ls->ls_recoverd_active);
467 spin_lock_init(&ls->ls_recover_lock);
468 spin_lock_init(&ls->ls_rcom_spin);
469 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
470 ls->ls_recover_status = 0;
471 ls->ls_recover_seq = get_random_u64();
472 ls->ls_recover_args = NULL;
473 init_rwsem(&ls->ls_in_recovery);
474 rwlock_init(&ls->ls_recv_active);
475 INIT_LIST_HEAD(&ls->ls_requestqueue);
476 rwlock_init(&ls->ls_requestqueue_lock);
477 spin_lock_init(&ls->ls_clear_proc_locks);
478
479 /* Due backwards compatibility with 3.1 we need to use maximum
480 * possible dlm message size to be sure the message will fit and
481 * not having out of bounds issues. However on sending side 3.2
482 * might send less.
483 */
484 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
485 if (!ls->ls_recover_buf) {
486 error = -ENOMEM;
487 goto out_lkbidr;
488 }
489
490 ls->ls_slot = 0;
491 ls->ls_num_slots = 0;
492 ls->ls_slots_size = 0;
493 ls->ls_slots = NULL;
494
495 INIT_LIST_HEAD(&ls->ls_recover_list);
496 spin_lock_init(&ls->ls_recover_list_lock);
497 idr_init(&ls->ls_recover_idr);
498 spin_lock_init(&ls->ls_recover_idr_lock);
499 ls->ls_recover_list_count = 0;
500 ls->ls_local_handle = ls;
501 init_waitqueue_head(&ls->ls_wait_general);
502 INIT_LIST_HEAD(&ls->ls_masters_list);
503 rwlock_init(&ls->ls_masters_lock);
504 INIT_LIST_HEAD(&ls->ls_dir_dump_list);
505 rwlock_init(&ls->ls_dir_dump_lock);
506
507 INIT_LIST_HEAD(&ls->ls_toss_q);
508 spin_lock_init(&ls->ls_toss_q_lock);
509 timer_setup(&ls->ls_timer, dlm_rsb_toss_timer,
510 TIMER_DEFERRABLE);
511
512 spin_lock_bh(&lslist_lock);
513 ls->ls_create_count = 1;
514 list_add(&ls->ls_list, &lslist);
515 spin_unlock_bh(&lslist_lock);
516
517 if (flags & DLM_LSFL_FS) {
518 error = dlm_callback_start(ls);
519 if (error) {
520 log_error(ls, "can't start dlm_callback %d", error);
521 goto out_delist;
522 }
523 }
524
525 init_waitqueue_head(&ls->ls_recover_lock_wait);
526
527 /*
528 * Once started, dlm_recoverd first looks for ls in lslist, then
529 * initializes ls_in_recovery as locked in "down" mode. We need
530 * to wait for the wakeup from dlm_recoverd because in_recovery
531 * has to start out in down mode.
532 */
533
534 error = dlm_recoverd_start(ls);
535 if (error) {
536 log_error(ls, "can't start dlm_recoverd %d", error);
537 goto out_callback;
538 }
539
540 wait_event(ls->ls_recover_lock_wait,
541 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
542
543 /* let kobject handle freeing of ls if there's an error */
544 do_unreg = 1;
545
546 ls->ls_kobj.kset = dlm_kset;
547 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
548 "%s", ls->ls_name);
549 if (error)
550 goto out_recoverd;
551 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
552
553 /* This uevent triggers dlm_controld in userspace to add us to the
554 group of nodes that are members of this lockspace (managed by the
555 cluster infrastructure.) Once it's done that, it tells us who the
556 current lockspace members are (via configfs) and then tells the
557 lockspace to start running (via sysfs) in dlm_ls_start(). */
558
559 error = do_uevent(ls, 1);
560 if (error)
561 goto out_recoverd;
562
563 /* wait until recovery is successful or failed */
564 wait_for_completion(&ls->ls_recovery_done);
565 error = ls->ls_recovery_result;
566 if (error)
567 goto out_members;
568
569 dlm_create_debug_file(ls);
570
571 log_rinfo(ls, "join complete");
572 *lockspace = ls;
573 return 0;
574
575 out_members:
576 do_uevent(ls, 0);
577 dlm_clear_members(ls);
578 kfree(ls->ls_node_array);
579 out_recoverd:
580 dlm_recoverd_stop(ls);
581 out_callback:
582 dlm_callback_stop(ls);
583 out_delist:
584 spin_lock_bh(&lslist_lock);
585 list_del(&ls->ls_list);
586 spin_unlock_bh(&lslist_lock);
587 idr_destroy(&ls->ls_recover_idr);
588 kfree(ls->ls_recover_buf);
589 out_lkbidr:
590 idr_destroy(&ls->ls_lkbidr);
591 rhashtable_destroy(&ls->ls_rsbtbl);
592 out_lsfree:
593 if (do_unreg)
594 kobject_put(&ls->ls_kobj);
595 else
596 kfree(ls);
597 out:
598 module_put(THIS_MODULE);
599 return error;
600 }
601
__dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)602 static int __dlm_new_lockspace(const char *name, const char *cluster,
603 uint32_t flags, int lvblen,
604 const struct dlm_lockspace_ops *ops,
605 void *ops_arg, int *ops_result,
606 dlm_lockspace_t **lockspace)
607 {
608 int error = 0;
609
610 mutex_lock(&ls_lock);
611 if (!ls_count)
612 error = threads_start();
613 if (error)
614 goto out;
615
616 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
617 ops_result, lockspace);
618 if (!error)
619 ls_count++;
620 if (error > 0)
621 error = 0;
622 if (!ls_count) {
623 dlm_midcomms_shutdown();
624 dlm_midcomms_stop();
625 }
626 out:
627 mutex_unlock(&ls_lock);
628 return error;
629 }
630
dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)631 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
632 int lvblen, const struct dlm_lockspace_ops *ops,
633 void *ops_arg, int *ops_result,
634 dlm_lockspace_t **lockspace)
635 {
636 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
637 ops, ops_arg, ops_result, lockspace);
638 }
639
dlm_new_user_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)640 int dlm_new_user_lockspace(const char *name, const char *cluster,
641 uint32_t flags, int lvblen,
642 const struct dlm_lockspace_ops *ops,
643 void *ops_arg, int *ops_result,
644 dlm_lockspace_t **lockspace)
645 {
646 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
647 ops_arg, ops_result, lockspace);
648 }
649
lkb_idr_is_local(int id,void * p,void * data)650 static int lkb_idr_is_local(int id, void *p, void *data)
651 {
652 struct dlm_lkb *lkb = p;
653
654 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
655 }
656
lkb_idr_is_any(int id,void * p,void * data)657 static int lkb_idr_is_any(int id, void *p, void *data)
658 {
659 return 1;
660 }
661
lkb_idr_free(int id,void * p,void * data)662 static int lkb_idr_free(int id, void *p, void *data)
663 {
664 struct dlm_lkb *lkb = p;
665
666 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
667 dlm_free_lvb(lkb->lkb_lvbptr);
668
669 dlm_free_lkb(lkb);
670 return 0;
671 }
672
673 /* NOTE: We check the lkbidr here rather than the resource table.
674 This is because there may be LKBs queued as ASTs that have been unlinked
675 from their RSBs and are pending deletion once the AST has been delivered */
676
lockspace_busy(struct dlm_ls * ls,int force)677 static int lockspace_busy(struct dlm_ls *ls, int force)
678 {
679 int rv;
680
681 read_lock_bh(&ls->ls_lkbidr_lock);
682 if (force == 0) {
683 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
684 } else if (force == 1) {
685 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
686 } else {
687 rv = 0;
688 }
689 read_unlock_bh(&ls->ls_lkbidr_lock);
690 return rv;
691 }
692
rhash_free_rsb(void * ptr,void * arg)693 static void rhash_free_rsb(void *ptr, void *arg)
694 {
695 struct dlm_rsb *rsb = ptr;
696
697 dlm_free_rsb(rsb);
698 }
699
release_lockspace(struct dlm_ls * ls,int force)700 static int release_lockspace(struct dlm_ls *ls, int force)
701 {
702 struct dlm_rsb *rsb;
703 int busy, rv;
704
705 busy = lockspace_busy(ls, force);
706
707 spin_lock_bh(&lslist_lock);
708 if (ls->ls_create_count == 1) {
709 if (busy) {
710 rv = -EBUSY;
711 } else {
712 /* remove_lockspace takes ls off lslist */
713 ls->ls_create_count = 0;
714 rv = 0;
715 }
716 } else if (ls->ls_create_count > 1) {
717 rv = --ls->ls_create_count;
718 } else {
719 rv = -EINVAL;
720 }
721 spin_unlock_bh(&lslist_lock);
722
723 if (rv) {
724 log_debug(ls, "release_lockspace no remove %d", rv);
725 return rv;
726 }
727
728 if (ls_count == 1)
729 dlm_midcomms_version_wait();
730
731 dlm_device_deregister(ls);
732
733 if (force < 3 && dlm_user_daemon_available())
734 do_uevent(ls, 0);
735
736 dlm_recoverd_stop(ls);
737
738 /* clear the LSFL_RUNNING flag to fast up
739 * time_shutdown_sync(), we don't care anymore
740 */
741 clear_bit(LSFL_RUNNING, &ls->ls_flags);
742 timer_shutdown_sync(&ls->ls_timer);
743
744 if (ls_count == 1) {
745 dlm_clear_members(ls);
746 dlm_midcomms_shutdown();
747 }
748
749 dlm_callback_stop(ls);
750
751 remove_lockspace(ls);
752
753 dlm_delete_debug_file(ls);
754
755 idr_destroy(&ls->ls_recover_idr);
756 kfree(ls->ls_recover_buf);
757
758 /*
759 * Free all lkb's in idr
760 */
761
762 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
763 idr_destroy(&ls->ls_lkbidr);
764
765 /*
766 * Free all rsb's on rsbtbl
767 */
768 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
769
770 while (!list_empty(&ls->ls_new_rsb)) {
771 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
772 res_hashchain);
773 list_del(&rsb->res_hashchain);
774 dlm_free_rsb(rsb);
775 }
776
777 /*
778 * Free structures on any other lists
779 */
780
781 dlm_purge_requestqueue(ls);
782 kfree(ls->ls_recover_args);
783 dlm_clear_members(ls);
784 dlm_clear_members_gone(ls);
785 kfree(ls->ls_node_array);
786 log_rinfo(ls, "release_lockspace final free");
787 kobject_put(&ls->ls_kobj);
788 /* The ls structure will be freed when the kobject is done with */
789
790 module_put(THIS_MODULE);
791 return 0;
792 }
793
794 /*
795 * Called when a system has released all its locks and is not going to use the
796 * lockspace any longer. We free everything we're managing for this lockspace.
797 * Remaining nodes will go through the recovery process as if we'd died. The
798 * lockspace must continue to function as usual, participating in recoveries,
799 * until this returns.
800 *
801 * Force has 4 possible values:
802 * 0 - don't destroy lockspace if it has any LKBs
803 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
804 * 2 - destroy lockspace regardless of LKBs
805 * 3 - destroy lockspace as part of a forced shutdown
806 */
807
dlm_release_lockspace(void * lockspace,int force)808 int dlm_release_lockspace(void *lockspace, int force)
809 {
810 struct dlm_ls *ls;
811 int error;
812
813 ls = dlm_find_lockspace_local(lockspace);
814 if (!ls)
815 return -EINVAL;
816 dlm_put_lockspace(ls);
817
818 mutex_lock(&ls_lock);
819 error = release_lockspace(ls, force);
820 if (!error)
821 ls_count--;
822 if (!ls_count)
823 dlm_midcomms_stop();
824 mutex_unlock(&ls_lock);
825
826 return error;
827 }
828
dlm_stop_lockspaces(void)829 void dlm_stop_lockspaces(void)
830 {
831 struct dlm_ls *ls;
832 int count;
833
834 restart:
835 count = 0;
836 spin_lock_bh(&lslist_lock);
837 list_for_each_entry(ls, &lslist, ls_list) {
838 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
839 count++;
840 continue;
841 }
842 spin_unlock_bh(&lslist_lock);
843 log_error(ls, "no userland control daemon, stopping lockspace");
844 dlm_ls_stop(ls);
845 goto restart;
846 }
847 spin_unlock_bh(&lslist_lock);
848
849 if (count)
850 log_print("dlm user daemon left %d lockspaces", count);
851 }
852