~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/fs/dlm/lockspace.c

Version: ~ [ linux-6.11.5 ] ~ [ linux-6.10.14 ] ~ [ linux-6.9.12 ] ~ [ linux-6.8.12 ] ~ [ linux-6.7.12 ] ~ [ linux-6.6.58 ] ~ [ linux-6.5.13 ] ~ [ linux-6.4.16 ] ~ [ linux-6.3.13 ] ~ [ linux-6.2.16 ] ~ [ linux-6.1.114 ] ~ [ linux-6.0.19 ] ~ [ linux-5.19.17 ] ~ [ linux-5.18.19 ] ~ [ linux-5.17.15 ] ~ [ linux-5.16.20 ] ~ [ linux-5.15.169 ] ~ [ linux-5.14.21 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.228 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.284 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.322 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.336 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.337 ] ~ [ linux-4.4.302 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.9 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 // SPDX-License-Identifier: GPL-2.0-only
  2 /******************************************************************************
  3 *******************************************************************************
  4 **
  5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  7 **
  8 **
  9 *******************************************************************************
 10 ******************************************************************************/
 11 
 12 #include <linux/module.h>
 13 
 14 #include "dlm_internal.h"
 15 #include "lockspace.h"
 16 #include "member.h"
 17 #include "recoverd.h"
 18 #include "dir.h"
 19 #include "midcomms.h"
 20 #include "config.h"
 21 #include "memory.h"
 22 #include "lock.h"
 23 #include "recover.h"
 24 #include "requestqueue.h"
 25 #include "user.h"
 26 #include "ast.h"
 27 
 28 static int                      ls_count;
 29 static struct mutex             ls_lock;
 30 static struct list_head         lslist;
 31 static spinlock_t               lslist_lock;
 32 
 33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 34 {
 35         ssize_t ret = len;
 36         int n;
 37         int rc = kstrtoint(buf, 0, &n);
 38 
 39         if (rc)
 40                 return rc;
 41         ls = dlm_find_lockspace_local(ls);
 42         if (!ls)
 43                 return -EINVAL;
 44 
 45         switch (n) {
 46         case 0:
 47                 dlm_ls_stop(ls);
 48                 break;
 49         case 1:
 50                 dlm_ls_start(ls);
 51                 break;
 52         default:
 53                 ret = -EINVAL;
 54         }
 55         dlm_put_lockspace(ls);
 56         return ret;
 57 }
 58 
 59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 60 {
 61         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 62 
 63         if (rc)
 64                 return rc;
 65         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 66         wake_up(&ls->ls_uevent_wait);
 67         return len;
 68 }
 69 
 70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 71 {
 72         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 73 }
 74 
 75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 76 {
 77         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 78 
 79         if (rc)
 80                 return rc;
 81         return len;
 82 }
 83 
 84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 85 {
 86         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 87 }
 88 
 89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 90 {
 91         int val;
 92         int rc = kstrtoint(buf, 0, &val);
 93 
 94         if (rc)
 95                 return rc;
 96         if (val == 1)
 97                 set_bit(LSFL_NODIR, &ls->ls_flags);
 98         return len;
 99 }
100 
101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103         uint32_t status = dlm_recover_status(ls);
104         return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106 
107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111 
112 struct dlm_attr {
113         struct attribute attr;
114         ssize_t (*show)(struct dlm_ls *, char *);
115         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117 
118 static struct dlm_attr dlm_attr_control = {
119         .attr  = {.name = "control", .mode = S_IWUSR},
120         .store = dlm_control_store
121 };
122 
123 static struct dlm_attr dlm_attr_event = {
124         .attr  = {.name = "event_done", .mode = S_IWUSR},
125         .store = dlm_event_store
126 };
127 
128 static struct dlm_attr dlm_attr_id = {
129         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130         .show  = dlm_id_show,
131         .store = dlm_id_store
132 };
133 
134 static struct dlm_attr dlm_attr_nodir = {
135         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136         .show  = dlm_nodir_show,
137         .store = dlm_nodir_store
138 };
139 
140 static struct dlm_attr dlm_attr_recover_status = {
141         .attr  = {.name = "recover_status", .mode = S_IRUGO},
142         .show  = dlm_recover_status_show
143 };
144 
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147         .show  = dlm_recover_nodeid_show
148 };
149 
150 static struct attribute *dlm_attrs[] = {
151         &dlm_attr_control.attr,
152         &dlm_attr_event.attr,
153         &dlm_attr_id.attr,
154         &dlm_attr_nodir.attr,
155         &dlm_attr_recover_status.attr,
156         &dlm_attr_recover_nodeid.attr,
157         NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160 
161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162                              char *buf)
163 {
164         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166         return a->show ? a->show(ls, buf) : 0;
167 }
168 
169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170                               const char *buf, size_t len)
171 {
172         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174         return a->store ? a->store(ls, buf, len) : len;
175 }
176 
177 static void lockspace_kobj_release(struct kobject *k)
178 {
179         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
180         kfree(ls);
181 }
182 
183 static const struct sysfs_ops dlm_attr_ops = {
184         .show  = dlm_attr_show,
185         .store = dlm_attr_store,
186 };
187 
188 static struct kobj_type dlm_ktype = {
189         .default_groups = dlm_groups,
190         .sysfs_ops     = &dlm_attr_ops,
191         .release       = lockspace_kobj_release,
192 };
193 
194 static struct kset *dlm_kset;
195 
196 static int do_uevent(struct dlm_ls *ls, int in)
197 {
198         if (in)
199                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
200         else
201                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
202 
203         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
204 
205         /* dlm_controld will see the uevent, do the necessary group management
206            and then write to sysfs to wake us */
207 
208         wait_event(ls->ls_uevent_wait,
209                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
210 
211         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
212 
213         return ls->ls_uevent_result;
214 }
215 
216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
217 {
218         const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
219 
220         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
221         return 0;
222 }
223 
224 static const struct kset_uevent_ops dlm_uevent_ops = {
225         .uevent = dlm_uevent,
226 };
227 
228 int __init dlm_lockspace_init(void)
229 {
230         ls_count = 0;
231         mutex_init(&ls_lock);
232         INIT_LIST_HEAD(&lslist);
233         spin_lock_init(&lslist_lock);
234 
235         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
236         if (!dlm_kset) {
237                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
238                 return -ENOMEM;
239         }
240         return 0;
241 }
242 
243 void dlm_lockspace_exit(void)
244 {
245         kset_unregister(dlm_kset);
246 }
247 
248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
249 {
250         struct dlm_ls *ls;
251 
252         spin_lock_bh(&lslist_lock);
253 
254         list_for_each_entry(ls, &lslist, ls_list) {
255                 if (ls->ls_global_id == id) {
256                         atomic_inc(&ls->ls_count);
257                         goto out;
258                 }
259         }
260         ls = NULL;
261  out:
262         spin_unlock_bh(&lslist_lock);
263         return ls;
264 }
265 
266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
267 {
268         struct dlm_ls *ls = lockspace;
269 
270         atomic_inc(&ls->ls_count);
271         return ls;
272 }
273 
274 struct dlm_ls *dlm_find_lockspace_device(int minor)
275 {
276         struct dlm_ls *ls;
277 
278         spin_lock_bh(&lslist_lock);
279         list_for_each_entry(ls, &lslist, ls_list) {
280                 if (ls->ls_device.minor == minor) {
281                         atomic_inc(&ls->ls_count);
282                         goto out;
283                 }
284         }
285         ls = NULL;
286  out:
287         spin_unlock_bh(&lslist_lock);
288         return ls;
289 }
290 
291 void dlm_put_lockspace(struct dlm_ls *ls)
292 {
293         if (atomic_dec_and_test(&ls->ls_count))
294                 wake_up(&ls->ls_count_wait);
295 }
296 
297 static void remove_lockspace(struct dlm_ls *ls)
298 {
299 retry:
300         wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
301 
302         spin_lock_bh(&lslist_lock);
303         if (atomic_read(&ls->ls_count) != 0) {
304                 spin_unlock_bh(&lslist_lock);
305                 goto retry;
306         }
307 
308         WARN_ON(ls->ls_create_count != 0);
309         list_del(&ls->ls_list);
310         spin_unlock_bh(&lslist_lock);
311 }
312 
313 static int threads_start(void)
314 {
315         int error;
316 
317         /* Thread for sending/receiving messages for all lockspace's */
318         error = dlm_midcomms_start();
319         if (error)
320                 log_print("cannot start dlm midcomms %d", error);
321 
322         return error;
323 }
324 
325 static int new_lockspace(const char *name, const char *cluster,
326                          uint32_t flags, int lvblen,
327                          const struct dlm_lockspace_ops *ops, void *ops_arg,
328                          int *ops_result, dlm_lockspace_t **lockspace)
329 {
330         struct dlm_ls *ls;
331         int do_unreg = 0;
332         int namelen = strlen(name);
333         int error;
334 
335         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
336                 return -EINVAL;
337 
338         if (lvblen % 8)
339                 return -EINVAL;
340 
341         if (!try_module_get(THIS_MODULE))
342                 return -EINVAL;
343 
344         if (!dlm_user_daemon_available()) {
345                 log_print("dlm user daemon not available");
346                 error = -EUNATCH;
347                 goto out;
348         }
349 
350         if (ops && ops_result) {
351                 if (!dlm_config.ci_recover_callbacks)
352                         *ops_result = -EOPNOTSUPP;
353                 else
354                         *ops_result = 0;
355         }
356 
357         if (!cluster)
358                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
359                           dlm_config.ci_cluster_name);
360 
361         if (dlm_config.ci_recover_callbacks && cluster &&
362             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
363                 log_print("dlm cluster name '%s' does not match "
364                           "the application cluster name '%s'",
365                           dlm_config.ci_cluster_name, cluster);
366                 error = -EBADR;
367                 goto out;
368         }
369 
370         error = 0;
371 
372         spin_lock_bh(&lslist_lock);
373         list_for_each_entry(ls, &lslist, ls_list) {
374                 WARN_ON(ls->ls_create_count <= 0);
375                 if (ls->ls_namelen != namelen)
376                         continue;
377                 if (memcmp(ls->ls_name, name, namelen))
378                         continue;
379                 if (flags & DLM_LSFL_NEWEXCL) {
380                         error = -EEXIST;
381                         break;
382                 }
383                 ls->ls_create_count++;
384                 *lockspace = ls;
385                 error = 1;
386                 break;
387         }
388         spin_unlock_bh(&lslist_lock);
389 
390         if (error)
391                 goto out;
392 
393         error = -ENOMEM;
394 
395         ls = kzalloc(sizeof(*ls), GFP_NOFS);
396         if (!ls)
397                 goto out;
398         memcpy(ls->ls_name, name, namelen);
399         ls->ls_namelen = namelen;
400         ls->ls_lvblen = lvblen;
401         atomic_set(&ls->ls_count, 0);
402         init_waitqueue_head(&ls->ls_count_wait);
403         ls->ls_flags = 0;
404 
405         if (ops && dlm_config.ci_recover_callbacks) {
406                 ls->ls_ops = ops;
407                 ls->ls_ops_arg = ops_arg;
408         }
409 
410         if (flags & DLM_LSFL_SOFTIRQ)
411                 set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
412 
413         /* ls_exflags are forced to match among nodes, and we don't
414          * need to require all nodes to have some flags set
415          */
416         ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
417                                     DLM_LSFL_SOFTIRQ));
418 
419         INIT_LIST_HEAD(&ls->ls_slow_inactive);
420         INIT_LIST_HEAD(&ls->ls_slow_active);
421         rwlock_init(&ls->ls_rsbtbl_lock);
422 
423         error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
424         if (error)
425                 goto out_lsfree;
426 
427         xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
428         rwlock_init(&ls->ls_lkbxa_lock);
429 
430         INIT_LIST_HEAD(&ls->ls_waiters);
431         spin_lock_init(&ls->ls_waiters_lock);
432         INIT_LIST_HEAD(&ls->ls_orphans);
433         spin_lock_init(&ls->ls_orphans_lock);
434 
435         INIT_LIST_HEAD(&ls->ls_nodes);
436         INIT_LIST_HEAD(&ls->ls_nodes_gone);
437         ls->ls_num_nodes = 0;
438         ls->ls_low_nodeid = 0;
439         ls->ls_total_weight = 0;
440         ls->ls_node_array = NULL;
441 
442         memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
443         ls->ls_local_rsb.res_ls = ls;
444 
445         ls->ls_debug_rsb_dentry = NULL;
446         ls->ls_debug_waiters_dentry = NULL;
447 
448         init_waitqueue_head(&ls->ls_uevent_wait);
449         ls->ls_uevent_result = 0;
450         init_completion(&ls->ls_recovery_done);
451         ls->ls_recovery_result = -1;
452 
453         spin_lock_init(&ls->ls_cb_lock);
454         INIT_LIST_HEAD(&ls->ls_cb_delay);
455 
456         ls->ls_recoverd_task = NULL;
457         mutex_init(&ls->ls_recoverd_active);
458         spin_lock_init(&ls->ls_recover_lock);
459         spin_lock_init(&ls->ls_rcom_spin);
460         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
461         ls->ls_recover_status = 0;
462         ls->ls_recover_seq = get_random_u64();
463         ls->ls_recover_args = NULL;
464         init_rwsem(&ls->ls_in_recovery);
465         rwlock_init(&ls->ls_recv_active);
466         INIT_LIST_HEAD(&ls->ls_requestqueue);
467         rwlock_init(&ls->ls_requestqueue_lock);
468         spin_lock_init(&ls->ls_clear_proc_locks);
469 
470         /* Due backwards compatibility with 3.1 we need to use maximum
471          * possible dlm message size to be sure the message will fit and
472          * not having out of bounds issues. However on sending side 3.2
473          * might send less.
474          */
475         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
476         if (!ls->ls_recover_buf) {
477                 error = -ENOMEM;
478                 goto out_lkbxa;
479         }
480 
481         ls->ls_slot = 0;
482         ls->ls_num_slots = 0;
483         ls->ls_slots_size = 0;
484         ls->ls_slots = NULL;
485 
486         INIT_LIST_HEAD(&ls->ls_recover_list);
487         spin_lock_init(&ls->ls_recover_list_lock);
488         xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
489         spin_lock_init(&ls->ls_recover_xa_lock);
490         ls->ls_recover_list_count = 0;
491         init_waitqueue_head(&ls->ls_wait_general);
492         INIT_LIST_HEAD(&ls->ls_masters_list);
493         rwlock_init(&ls->ls_masters_lock);
494         INIT_LIST_HEAD(&ls->ls_dir_dump_list);
495         rwlock_init(&ls->ls_dir_dump_lock);
496 
497         INIT_LIST_HEAD(&ls->ls_scan_list);
498         spin_lock_init(&ls->ls_scan_lock);
499         timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
500 
501         spin_lock_bh(&lslist_lock);
502         ls->ls_create_count = 1;
503         list_add(&ls->ls_list, &lslist);
504         spin_unlock_bh(&lslist_lock);
505 
506         if (flags & DLM_LSFL_FS)
507                 set_bit(LSFL_FS, &ls->ls_flags);
508 
509         error = dlm_callback_start(ls);
510         if (error) {
511                 log_error(ls, "can't start dlm_callback %d", error);
512                 goto out_delist;
513         }
514 
515         init_waitqueue_head(&ls->ls_recover_lock_wait);
516 
517         /*
518          * Once started, dlm_recoverd first looks for ls in lslist, then
519          * initializes ls_in_recovery as locked in "down" mode.  We need
520          * to wait for the wakeup from dlm_recoverd because in_recovery
521          * has to start out in down mode.
522          */
523 
524         error = dlm_recoverd_start(ls);
525         if (error) {
526                 log_error(ls, "can't start dlm_recoverd %d", error);
527                 goto out_callback;
528         }
529 
530         wait_event(ls->ls_recover_lock_wait,
531                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
532 
533         /* let kobject handle freeing of ls if there's an error */
534         do_unreg = 1;
535 
536         ls->ls_kobj.kset = dlm_kset;
537         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
538                                      "%s", ls->ls_name);
539         if (error)
540                 goto out_recoverd;
541         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
542 
543         /* This uevent triggers dlm_controld in userspace to add us to the
544            group of nodes that are members of this lockspace (managed by the
545            cluster infrastructure.)  Once it's done that, it tells us who the
546            current lockspace members are (via configfs) and then tells the
547            lockspace to start running (via sysfs) in dlm_ls_start(). */
548 
549         error = do_uevent(ls, 1);
550         if (error)
551                 goto out_recoverd;
552 
553         /* wait until recovery is successful or failed */
554         wait_for_completion(&ls->ls_recovery_done);
555         error = ls->ls_recovery_result;
556         if (error)
557                 goto out_members;
558 
559         dlm_create_debug_file(ls);
560 
561         log_rinfo(ls, "join complete");
562         *lockspace = ls;
563         return 0;
564 
565  out_members:
566         do_uevent(ls, 0);
567         dlm_clear_members(ls);
568         kfree(ls->ls_node_array);
569  out_recoverd:
570         dlm_recoverd_stop(ls);
571  out_callback:
572         dlm_callback_stop(ls);
573  out_delist:
574         spin_lock_bh(&lslist_lock);
575         list_del(&ls->ls_list);
576         spin_unlock_bh(&lslist_lock);
577         xa_destroy(&ls->ls_recover_xa);
578         kfree(ls->ls_recover_buf);
579  out_lkbxa:
580         xa_destroy(&ls->ls_lkbxa);
581         rhashtable_destroy(&ls->ls_rsbtbl);
582  out_lsfree:
583         if (do_unreg)
584                 kobject_put(&ls->ls_kobj);
585         else
586                 kfree(ls);
587  out:
588         module_put(THIS_MODULE);
589         return error;
590 }
591 
592 static int __dlm_new_lockspace(const char *name, const char *cluster,
593                                uint32_t flags, int lvblen,
594                                const struct dlm_lockspace_ops *ops,
595                                void *ops_arg, int *ops_result,
596                                dlm_lockspace_t **lockspace)
597 {
598         int error = 0;
599 
600         mutex_lock(&ls_lock);
601         if (!ls_count)
602                 error = threads_start();
603         if (error)
604                 goto out;
605 
606         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
607                               ops_result, lockspace);
608         if (!error)
609                 ls_count++;
610         if (error > 0)
611                 error = 0;
612         if (!ls_count) {
613                 dlm_midcomms_shutdown();
614                 dlm_midcomms_stop();
615         }
616  out:
617         mutex_unlock(&ls_lock);
618         return error;
619 }
620 
621 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
622                       int lvblen, const struct dlm_lockspace_ops *ops,
623                       void *ops_arg, int *ops_result,
624                       dlm_lockspace_t **lockspace)
625 {
626         return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
627                                    ops, ops_arg, ops_result, lockspace);
628 }
629 
630 int dlm_new_user_lockspace(const char *name, const char *cluster,
631                            uint32_t flags, int lvblen,
632                            const struct dlm_lockspace_ops *ops,
633                            void *ops_arg, int *ops_result,
634                            dlm_lockspace_t **lockspace)
635 {
636         if (flags & DLM_LSFL_SOFTIRQ)
637                 return -EINVAL;
638 
639         return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
640                                    ops_arg, ops_result, lockspace);
641 }
642 
643 static int lkb_idr_free(struct dlm_lkb *lkb)
644 {
645         if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
646                 dlm_free_lvb(lkb->lkb_lvbptr);
647 
648         dlm_free_lkb(lkb);
649         return 0;
650 }
651 
652 /* NOTE: We check the lkbxa here rather than the resource table.
653    This is because there may be LKBs queued as ASTs that have been unlinked
654    from their RSBs and are pending deletion once the AST has been delivered */
655 
656 static int lockspace_busy(struct dlm_ls *ls, int force)
657 {
658         struct dlm_lkb *lkb;
659         unsigned long id;
660         int rv = 0;
661 
662         read_lock_bh(&ls->ls_lkbxa_lock);
663         if (force == 0) {
664                 xa_for_each(&ls->ls_lkbxa, id, lkb) {
665                         rv = 1;
666                         break;
667                 }
668         } else if (force == 1) {
669                 xa_for_each(&ls->ls_lkbxa, id, lkb) {
670                         if (lkb->lkb_nodeid == 0 &&
671                             lkb->lkb_grmode != DLM_LOCK_IV) {
672                                 rv = 1;
673                                 break;
674                         }
675                 }
676         } else {
677                 rv = 0;
678         }
679         read_unlock_bh(&ls->ls_lkbxa_lock);
680         return rv;
681 }
682 
683 static void rhash_free_rsb(void *ptr, void *arg)
684 {
685         struct dlm_rsb *rsb = ptr;
686 
687         dlm_free_rsb(rsb);
688 }
689 
690 static int release_lockspace(struct dlm_ls *ls, int force)
691 {
692         struct dlm_lkb *lkb;
693         unsigned long id;
694         int busy, rv;
695 
696         busy = lockspace_busy(ls, force);
697 
698         spin_lock_bh(&lslist_lock);
699         if (ls->ls_create_count == 1) {
700                 if (busy) {
701                         rv = -EBUSY;
702                 } else {
703                         /* remove_lockspace takes ls off lslist */
704                         ls->ls_create_count = 0;
705                         rv = 0;
706                 }
707         } else if (ls->ls_create_count > 1) {
708                 rv = --ls->ls_create_count;
709         } else {
710                 rv = -EINVAL;
711         }
712         spin_unlock_bh(&lslist_lock);
713 
714         if (rv) {
715                 log_debug(ls, "release_lockspace no remove %d", rv);
716                 return rv;
717         }
718 
719         if (ls_count == 1)
720                 dlm_midcomms_version_wait();
721 
722         dlm_device_deregister(ls);
723 
724         if (force < 3 && dlm_user_daemon_available())
725                 do_uevent(ls, 0);
726 
727         dlm_recoverd_stop(ls);
728 
729         /* clear the LSFL_RUNNING flag to fast up
730          * time_shutdown_sync(), we don't care anymore
731          */
732         clear_bit(LSFL_RUNNING, &ls->ls_flags);
733         timer_shutdown_sync(&ls->ls_scan_timer);
734 
735         if (ls_count == 1) {
736                 dlm_clear_members(ls);
737                 dlm_midcomms_shutdown();
738         }
739 
740         dlm_callback_stop(ls);
741 
742         remove_lockspace(ls);
743 
744         dlm_delete_debug_file(ls);
745 
746         xa_destroy(&ls->ls_recover_xa);
747         kfree(ls->ls_recover_buf);
748 
749         /*
750          * Free all lkb's in xa
751          */
752         xa_for_each(&ls->ls_lkbxa, id, lkb) {
753                 lkb_idr_free(lkb);
754         }
755         xa_destroy(&ls->ls_lkbxa);
756 
757         /*
758          * Free all rsb's on rsbtbl
759          */
760         rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
761 
762         /*
763          * Free structures on any other lists
764          */
765 
766         dlm_purge_requestqueue(ls);
767         kfree(ls->ls_recover_args);
768         dlm_clear_members(ls);
769         dlm_clear_members_gone(ls);
770         kfree(ls->ls_node_array);
771         log_rinfo(ls, "release_lockspace final free");
772         kobject_put(&ls->ls_kobj);
773         /* The ls structure will be freed when the kobject is done with */
774 
775         module_put(THIS_MODULE);
776         return 0;
777 }
778 
779 /*
780  * Called when a system has released all its locks and is not going to use the
781  * lockspace any longer.  We free everything we're managing for this lockspace.
782  * Remaining nodes will go through the recovery process as if we'd died.  The
783  * lockspace must continue to function as usual, participating in recoveries,
784  * until this returns.
785  *
786  * Force has 4 possible values:
787  * 0 - don't destroy lockspace if it has any LKBs
788  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
789  * 2 - destroy lockspace regardless of LKBs
790  * 3 - destroy lockspace as part of a forced shutdown
791  */
792 
793 int dlm_release_lockspace(void *lockspace, int force)
794 {
795         struct dlm_ls *ls;
796         int error;
797 
798         ls = dlm_find_lockspace_local(lockspace);
799         if (!ls)
800                 return -EINVAL;
801         dlm_put_lockspace(ls);
802 
803         mutex_lock(&ls_lock);
804         error = release_lockspace(ls, force);
805         if (!error)
806                 ls_count--;
807         if (!ls_count)
808                 dlm_midcomms_stop();
809         mutex_unlock(&ls_lock);
810 
811         return error;
812 }
813 
814 void dlm_stop_lockspaces(void)
815 {
816         struct dlm_ls *ls;
817         int count;
818 
819  restart:
820         count = 0;
821         spin_lock_bh(&lslist_lock);
822         list_for_each_entry(ls, &lslist, ls_list) {
823                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
824                         count++;
825                         continue;
826                 }
827                 spin_unlock_bh(&lslist_lock);
828                 log_error(ls, "no userland control daemon, stopping lockspace");
829                 dlm_ls_stop(ls);
830                 goto restart;
831         }
832         spin_unlock_bh(&lslist_lock);
833 
834         if (count)
835                 log_print("dlm user daemon left %d lockspaces", count);
836 }
837 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

sflogo.php