linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include <linux/module.h>
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "dir.h"
  19#include "lowcomms.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "recover.h"
  24#include "requestqueue.h"
  25#include "user.h"
  26#include "ast.h"
  27
  28static int                      ls_count;
  29static struct mutex             ls_lock;
  30static struct list_head         lslist;
  31static spinlock_t               lslist_lock;
  32static struct task_struct *     scand_task;
  33
  34
  35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36{
  37        ssize_t ret = len;
  38        int n;
  39        int rc = kstrtoint(buf, 0, &n);
  40
  41        if (rc)
  42                return rc;
  43        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  44        if (!ls)
  45                return -EINVAL;
  46
  47        switch (n) {
  48        case 0:
  49                dlm_ls_stop(ls);
  50                break;
  51        case 1:
  52                dlm_ls_start(ls);
  53                break;
  54        default:
  55                ret = -EINVAL;
  56        }
  57        dlm_put_lockspace(ls);
  58        return ret;
  59}
  60
  61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  62{
  63        int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
  64
  65        if (rc)
  66                return rc;
  67        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  68        wake_up(&ls->ls_uevent_wait);
  69        return len;
  70}
  71
  72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  73{
  74        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  75}
  76
  77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  78{
  79        int rc = kstrtouint(buf, 0, &ls->ls_global_id);
  80
  81        if (rc)
  82                return rc;
  83        return len;
  84}
  85
  86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  87{
  88        return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  89}
  90
  91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  92{
  93        int val;
  94        int rc = kstrtoint(buf, 0, &val);
  95
  96        if (rc)
  97                return rc;
  98        if (val == 1)
  99                set_bit(LSFL_NODIR, &ls->ls_flags);
 100        return len;
 101}
 102
 103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 104{
 105        uint32_t status = dlm_recover_status(ls);
 106        return snprintf(buf, PAGE_SIZE, "%x\n", status);
 107}
 108
 109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 110{
 111        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 112}
 113
 114struct dlm_attr {
 115        struct attribute attr;
 116        ssize_t (*show)(struct dlm_ls *, char *);
 117        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 118};
 119
 120static struct dlm_attr dlm_attr_control = {
 121        .attr  = {.name = "control", .mode = S_IWUSR},
 122        .store = dlm_control_store
 123};
 124
 125static struct dlm_attr dlm_attr_event = {
 126        .attr  = {.name = "event_done", .mode = S_IWUSR},
 127        .store = dlm_event_store
 128};
 129
 130static struct dlm_attr dlm_attr_id = {
 131        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 132        .show  = dlm_id_show,
 133        .store = dlm_id_store
 134};
 135
 136static struct dlm_attr dlm_attr_nodir = {
 137        .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 138        .show  = dlm_nodir_show,
 139        .store = dlm_nodir_store
 140};
 141
 142static struct dlm_attr dlm_attr_recover_status = {
 143        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 144        .show  = dlm_recover_status_show
 145};
 146
 147static struct dlm_attr dlm_attr_recover_nodeid = {
 148        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 149        .show  = dlm_recover_nodeid_show
 150};
 151
 152static struct attribute *dlm_attrs[] = {
 153        &dlm_attr_control.attr,
 154        &dlm_attr_event.attr,
 155        &dlm_attr_id.attr,
 156        &dlm_attr_nodir.attr,
 157        &dlm_attr_recover_status.attr,
 158        &dlm_attr_recover_nodeid.attr,
 159        NULL,
 160};
 161ATTRIBUTE_GROUPS(dlm);
 162
 163static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 164                             char *buf)
 165{
 166        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 167        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 168        return a->show ? a->show(ls, buf) : 0;
 169}
 170
 171static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 172                              const char *buf, size_t len)
 173{
 174        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 175        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 176        return a->store ? a->store(ls, buf, len) : len;
 177}
 178
 179static void lockspace_kobj_release(struct kobject *k)
 180{
 181        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 182        kfree(ls);
 183}
 184
 185static const struct sysfs_ops dlm_attr_ops = {
 186        .show  = dlm_attr_show,
 187        .store = dlm_attr_store,
 188};
 189
 190static struct kobj_type dlm_ktype = {
 191        .default_groups = dlm_groups,
 192        .sysfs_ops     = &dlm_attr_ops,
 193        .release       = lockspace_kobj_release,
 194};
 195
 196static struct kset *dlm_kset;
 197
 198static int do_uevent(struct dlm_ls *ls, int in)
 199{
 200        if (in)
 201                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 202        else
 203                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 204
 205        log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 206
 207        /* dlm_controld will see the uevent, do the necessary group management
 208           and then write to sysfs to wake us */
 209
 210        wait_event(ls->ls_uevent_wait,
 211                   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 212
 213        log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
 214
 215        return ls->ls_uevent_result;
 216}
 217
 218static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 219                      struct kobj_uevent_env *env)
 220{
 221        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 222
 223        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 224        return 0;
 225}
 226
 227static const struct kset_uevent_ops dlm_uevent_ops = {
 228        .uevent = dlm_uevent,
 229};
 230
 231int __init dlm_lockspace_init(void)
 232{
 233        ls_count = 0;
 234        mutex_init(&ls_lock);
 235        INIT_LIST_HEAD(&lslist);
 236        spin_lock_init(&lslist_lock);
 237
 238        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 239        if (!dlm_kset) {
 240                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 241                return -ENOMEM;
 242        }
 243        return 0;
 244}
 245
 246void dlm_lockspace_exit(void)
 247{
 248        kset_unregister(dlm_kset);
 249}
 250
 251static struct dlm_ls *find_ls_to_scan(void)
 252{
 253        struct dlm_ls *ls;
 254
 255        spin_lock(&lslist_lock);
 256        list_for_each_entry(ls, &lslist, ls_list) {
 257                if (time_after_eq(jiffies, ls->ls_scan_time +
 258                                            dlm_config.ci_scan_secs * HZ)) {
 259                        spin_unlock(&lslist_lock);
 260                        return ls;
 261                }
 262        }
 263        spin_unlock(&lslist_lock);
 264        return NULL;
 265}
 266
 267static int dlm_scand(void *data)
 268{
 269        struct dlm_ls *ls;
 270
 271        while (!kthread_should_stop()) {
 272                ls = find_ls_to_scan();
 273                if (ls) {
 274                        if (dlm_lock_recovery_try(ls)) {
 275                                ls->ls_scan_time = jiffies;
 276                                dlm_scan_rsbs(ls);
 277                                dlm_scan_timeout(ls);
 278                                dlm_scan_waiters(ls);
 279                                dlm_unlock_recovery(ls);
 280                        } else {
 281                                ls->ls_scan_time += HZ;
 282                        }
 283                        continue;
 284                }
 285                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 286        }
 287        return 0;
 288}
 289
 290static int dlm_scand_start(void)
 291{
 292        struct task_struct *p;
 293        int error = 0;
 294
 295        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 296        if (IS_ERR(p))
 297                error = PTR_ERR(p);
 298        else
 299                scand_task = p;
 300        return error;
 301}
 302
 303static void dlm_scand_stop(void)
 304{
 305        kthread_stop(scand_task);
 306}
 307
 308struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 309{
 310        struct dlm_ls *ls;
 311
 312        spin_lock(&lslist_lock);
 313
 314        list_for_each_entry(ls, &lslist, ls_list) {
 315                if (ls->ls_global_id == id) {
 316                        ls->ls_count++;
 317                        goto out;
 318                }
 319        }
 320        ls = NULL;
 321 out:
 322        spin_unlock(&lslist_lock);
 323        return ls;
 324}
 325
 326struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 327{
 328        struct dlm_ls *ls;
 329
 330        spin_lock(&lslist_lock);
 331        list_for_each_entry(ls, &lslist, ls_list) {
 332                if (ls->ls_local_handle == lockspace) {
 333                        ls->ls_count++;
 334                        goto out;
 335                }
 336        }
 337        ls = NULL;
 338 out:
 339        spin_unlock(&lslist_lock);
 340        return ls;
 341}
 342
 343struct dlm_ls *dlm_find_lockspace_device(int minor)
 344{
 345        struct dlm_ls *ls;
 346
 347        spin_lock(&lslist_lock);
 348        list_for_each_entry(ls, &lslist, ls_list) {
 349                if (ls->ls_device.minor == minor) {
 350                        ls->ls_count++;
 351                        goto out;
 352                }
 353        }
 354        ls = NULL;
 355 out:
 356        spin_unlock(&lslist_lock);
 357        return ls;
 358}
 359
 360void dlm_put_lockspace(struct dlm_ls *ls)
 361{
 362        spin_lock(&lslist_lock);
 363        ls->ls_count--;
 364        spin_unlock(&lslist_lock);
 365}
 366
 367static void remove_lockspace(struct dlm_ls *ls)
 368{
 369        for (;;) {
 370                spin_lock(&lslist_lock);
 371                if (ls->ls_count == 0) {
 372                        WARN_ON(ls->ls_create_count != 0);
 373                        list_del(&ls->ls_list);
 374                        spin_unlock(&lslist_lock);
 375                        return;
 376                }
 377                spin_unlock(&lslist_lock);
 378                ssleep(1);
 379        }
 380}
 381
 382static int threads_start(void)
 383{
 384        int error;
 385
 386        error = dlm_scand_start();
 387        if (error) {
 388                log_print("cannot start dlm_scand thread %d", error);
 389                goto fail;
 390        }
 391
 392        /* Thread for sending/receiving messages for all lockspace's */
 393        error = dlm_lowcomms_start();
 394        if (error) {
 395                log_print("cannot start dlm lowcomms %d", error);
 396                goto scand_fail;
 397        }
 398
 399        return 0;
 400
 401 scand_fail:
 402        dlm_scand_stop();
 403 fail:
 404        return error;
 405}
 406
 407static int new_lockspace(const char *name, const char *cluster,
 408                         uint32_t flags, int lvblen,
 409                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 410                         int *ops_result, dlm_lockspace_t **lockspace)
 411{
 412        struct dlm_ls *ls;
 413        int i, size, error;
 414        int do_unreg = 0;
 415        int namelen = strlen(name);
 416
 417        if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 418                return -EINVAL;
 419
 420        if (!lvblen || (lvblen % 8))
 421                return -EINVAL;
 422
 423        if (!try_module_get(THIS_MODULE))
 424                return -EINVAL;
 425
 426        if (!dlm_user_daemon_available()) {
 427                log_print("dlm user daemon not available");
 428                error = -EUNATCH;
 429                goto out;
 430        }
 431
 432        if (ops && ops_result) {
 433                if (!dlm_config.ci_recover_callbacks)
 434                        *ops_result = -EOPNOTSUPP;
 435                else
 436                        *ops_result = 0;
 437        }
 438
 439        if (!cluster)
 440                log_print("dlm cluster name '%s' is being used without an application provided cluster name",
 441                          dlm_config.ci_cluster_name);
 442
 443        if (dlm_config.ci_recover_callbacks && cluster &&
 444            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 445                log_print("dlm cluster name '%s' does not match "
 446                          "the application cluster name '%s'",
 447                          dlm_config.ci_cluster_name, cluster);
 448                error = -EBADR;
 449                goto out;
 450        }
 451
 452        error = 0;
 453
 454        spin_lock(&lslist_lock);
 455        list_for_each_entry(ls, &lslist, ls_list) {
 456                WARN_ON(ls->ls_create_count <= 0);
 457                if (ls->ls_namelen != namelen)
 458                        continue;
 459                if (memcmp(ls->ls_name, name, namelen))
 460                        continue;
 461                if (flags & DLM_LSFL_NEWEXCL) {
 462                        error = -EEXIST;
 463                        break;
 464                }
 465                ls->ls_create_count++;
 466                *lockspace = ls;
 467                error = 1;
 468                break;
 469        }
 470        spin_unlock(&lslist_lock);
 471
 472        if (error)
 473                goto out;
 474
 475        error = -ENOMEM;
 476
 477        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 478        if (!ls)
 479                goto out;
 480        memcpy(ls->ls_name, name, namelen);
 481        ls->ls_namelen = namelen;
 482        ls->ls_lvblen = lvblen;
 483        ls->ls_count = 0;
 484        ls->ls_flags = 0;
 485        ls->ls_scan_time = jiffies;
 486
 487        if (ops && dlm_config.ci_recover_callbacks) {
 488                ls->ls_ops = ops;
 489                ls->ls_ops_arg = ops_arg;
 490        }
 491
 492        if (flags & DLM_LSFL_TIMEWARN)
 493                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 494
 495        /* ls_exflags are forced to match among nodes, and we don't
 496           need to require all nodes to have some flags set */
 497        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 498                                    DLM_LSFL_NEWEXCL));
 499
 500        size = dlm_config.ci_rsbtbl_size;
 501        ls->ls_rsbtbl_size = size;
 502
 503        ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
 504        if (!ls->ls_rsbtbl)
 505                goto out_lsfree;
 506        for (i = 0; i < size; i++) {
 507                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 508                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 509                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 510        }
 511
 512        spin_lock_init(&ls->ls_remove_spin);
 513
 514        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 515                ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 516                                                 GFP_KERNEL);
 517                if (!ls->ls_remove_names[i])
 518                        goto out_rsbtbl;
 519        }
 520
 521        idr_init(&ls->ls_lkbidr);
 522        spin_lock_init(&ls->ls_lkbidr_spin);
 523
 524        INIT_LIST_HEAD(&ls->ls_waiters);
 525        mutex_init(&ls->ls_waiters_mutex);
 526        INIT_LIST_HEAD(&ls->ls_orphans);
 527        mutex_init(&ls->ls_orphans_mutex);
 528        INIT_LIST_HEAD(&ls->ls_timeout);
 529        mutex_init(&ls->ls_timeout_mutex);
 530
 531        INIT_LIST_HEAD(&ls->ls_new_rsb);
 532        spin_lock_init(&ls->ls_new_rsb_spin);
 533
 534        INIT_LIST_HEAD(&ls->ls_nodes);
 535        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 536        ls->ls_num_nodes = 0;
 537        ls->ls_low_nodeid = 0;
 538        ls->ls_total_weight = 0;
 539        ls->ls_node_array = NULL;
 540
 541        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 542        ls->ls_stub_rsb.res_ls = ls;
 543
 544        ls->ls_debug_rsb_dentry = NULL;
 545        ls->ls_debug_waiters_dentry = NULL;
 546
 547        init_waitqueue_head(&ls->ls_uevent_wait);
 548        ls->ls_uevent_result = 0;
 549        init_completion(&ls->ls_members_done);
 550        ls->ls_members_result = -1;
 551
 552        mutex_init(&ls->ls_cb_mutex);
 553        INIT_LIST_HEAD(&ls->ls_cb_delay);
 554
 555        ls->ls_recoverd_task = NULL;
 556        mutex_init(&ls->ls_recoverd_active);
 557        spin_lock_init(&ls->ls_recover_lock);
 558        spin_lock_init(&ls->ls_rcom_spin);
 559        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 560        ls->ls_recover_status = 0;
 561        ls->ls_recover_seq = 0;
 562        ls->ls_recover_args = NULL;
 563        init_rwsem(&ls->ls_in_recovery);
 564        init_rwsem(&ls->ls_recv_active);
 565        INIT_LIST_HEAD(&ls->ls_requestqueue);
 566        mutex_init(&ls->ls_requestqueue_mutex);
 567        mutex_init(&ls->ls_clear_proc_locks);
 568
 569        ls->ls_recover_buf = kmalloc(LOWCOMMS_MAX_TX_BUFFER_LEN, GFP_NOFS);
 570        if (!ls->ls_recover_buf)
 571                goto out_lkbidr;
 572
 573        ls->ls_slot = 0;
 574        ls->ls_num_slots = 0;
 575        ls->ls_slots_size = 0;
 576        ls->ls_slots = NULL;
 577
 578        INIT_LIST_HEAD(&ls->ls_recover_list);
 579        spin_lock_init(&ls->ls_recover_list_lock);
 580        idr_init(&ls->ls_recover_idr);
 581        spin_lock_init(&ls->ls_recover_idr_lock);
 582        ls->ls_recover_list_count = 0;
 583        ls->ls_local_handle = ls;
 584        init_waitqueue_head(&ls->ls_wait_general);
 585        INIT_LIST_HEAD(&ls->ls_root_list);
 586        init_rwsem(&ls->ls_root_sem);
 587
 588        spin_lock(&lslist_lock);
 589        ls->ls_create_count = 1;
 590        list_add(&ls->ls_list, &lslist);
 591        spin_unlock(&lslist_lock);
 592
 593        if (flags & DLM_LSFL_FS) {
 594                error = dlm_callback_start(ls);
 595                if (error) {
 596                        log_error(ls, "can't start dlm_callback %d", error);
 597                        goto out_delist;
 598                }
 599        }
 600
 601        init_waitqueue_head(&ls->ls_recover_lock_wait);
 602
 603        /*
 604         * Once started, dlm_recoverd first looks for ls in lslist, then
 605         * initializes ls_in_recovery as locked in "down" mode.  We need
 606         * to wait for the wakeup from dlm_recoverd because in_recovery
 607         * has to start out in down mode.
 608         */
 609
 610        error = dlm_recoverd_start(ls);
 611        if (error) {
 612                log_error(ls, "can't start dlm_recoverd %d", error);
 613                goto out_callback;
 614        }
 615
 616        wait_event(ls->ls_recover_lock_wait,
 617                   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 618
 619        /* let kobject handle freeing of ls if there's an error */
 620        do_unreg = 1;
 621
 622        ls->ls_kobj.kset = dlm_kset;
 623        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 624                                     "%s", ls->ls_name);
 625        if (error)
 626                goto out_recoverd;
 627        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 628
 629        /* This uevent triggers dlm_controld in userspace to add us to the
 630           group of nodes that are members of this lockspace (managed by the
 631           cluster infrastructure.)  Once it's done that, it tells us who the
 632           current lockspace members are (via configfs) and then tells the
 633           lockspace to start running (via sysfs) in dlm_ls_start(). */
 634
 635        error = do_uevent(ls, 1);
 636        if (error)
 637                goto out_recoverd;
 638
 639        wait_for_completion(&ls->ls_members_done);
 640        error = ls->ls_members_result;
 641        if (error)
 642                goto out_members;
 643
 644        dlm_create_debug_file(ls);
 645
 646        log_rinfo(ls, "join complete");
 647        *lockspace = ls;
 648        return 0;
 649
 650 out_members:
 651        do_uevent(ls, 0);
 652        dlm_clear_members(ls);
 653        kfree(ls->ls_node_array);
 654 out_recoverd:
 655        dlm_recoverd_stop(ls);
 656 out_callback:
 657        dlm_callback_stop(ls);
 658 out_delist:
 659        spin_lock(&lslist_lock);
 660        list_del(&ls->ls_list);
 661        spin_unlock(&lslist_lock);
 662        idr_destroy(&ls->ls_recover_idr);
 663        kfree(ls->ls_recover_buf);
 664 out_lkbidr:
 665        idr_destroy(&ls->ls_lkbidr);
 666 out_rsbtbl:
 667        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 668                kfree(ls->ls_remove_names[i]);
 669        vfree(ls->ls_rsbtbl);
 670 out_lsfree:
 671        if (do_unreg)
 672                kobject_put(&ls->ls_kobj);
 673        else
 674                kfree(ls);
 675 out:
 676        module_put(THIS_MODULE);
 677        return error;
 678}
 679
 680int dlm_new_lockspace(const char *name, const char *cluster,
 681                      uint32_t flags, int lvblen,
 682                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 683                      int *ops_result, dlm_lockspace_t **lockspace)
 684{
 685        int error = 0;
 686
 687        mutex_lock(&ls_lock);
 688        if (!ls_count)
 689                error = threads_start();
 690        if (error)
 691                goto out;
 692
 693        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 694                              ops_result, lockspace);
 695        if (!error)
 696                ls_count++;
 697        if (error > 0)
 698                error = 0;
 699        if (!ls_count) {
 700                dlm_scand_stop();
 701                dlm_lowcomms_shutdown();
 702                dlm_lowcomms_stop();
 703        }
 704 out:
 705        mutex_unlock(&ls_lock);
 706        return error;
 707}
 708
 709static int lkb_idr_is_local(int id, void *p, void *data)
 710{
 711        struct dlm_lkb *lkb = p;
 712
 713        return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 714}
 715
 716static int lkb_idr_is_any(int id, void *p, void *data)
 717{
 718        return 1;
 719}
 720
 721static int lkb_idr_free(int id, void *p, void *data)
 722{
 723        struct dlm_lkb *lkb = p;
 724
 725        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 726                dlm_free_lvb(lkb->lkb_lvbptr);
 727
 728        dlm_free_lkb(lkb);
 729        return 0;
 730}
 731
 732/* NOTE: We check the lkbidr here rather than the resource table.
 733   This is because there may be LKBs queued as ASTs that have been unlinked
 734   from their RSBs and are pending deletion once the AST has been delivered */
 735
 736static int lockspace_busy(struct dlm_ls *ls, int force)
 737{
 738        int rv;
 739
 740        spin_lock(&ls->ls_lkbidr_spin);
 741        if (force == 0) {
 742                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 743        } else if (force == 1) {
 744                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 745        } else {
 746                rv = 0;
 747        }
 748        spin_unlock(&ls->ls_lkbidr_spin);
 749        return rv;
 750}
 751
 752static int release_lockspace(struct dlm_ls *ls, int force)
 753{
 754        struct dlm_rsb *rsb;
 755        struct rb_node *n;
 756        int i, busy, rv;
 757
 758        busy = lockspace_busy(ls, force);
 759
 760        spin_lock(&lslist_lock);
 761        if (ls->ls_create_count == 1) {
 762                if (busy) {
 763                        rv = -EBUSY;
 764                } else {
 765                        /* remove_lockspace takes ls off lslist */
 766                        ls->ls_create_count = 0;
 767                        rv = 0;
 768                }
 769        } else if (ls->ls_create_count > 1) {
 770                rv = --ls->ls_create_count;
 771        } else {
 772                rv = -EINVAL;
 773        }
 774        spin_unlock(&lslist_lock);
 775
 776        if (rv) {
 777                log_debug(ls, "release_lockspace no remove %d", rv);
 778                return rv;
 779        }
 780
 781        dlm_device_deregister(ls);
 782
 783        if (force < 3 && dlm_user_daemon_available())
 784                do_uevent(ls, 0);
 785
 786        dlm_recoverd_stop(ls);
 787
 788        if (ls_count == 1) {
 789                dlm_scand_stop();
 790                dlm_lowcomms_shutdown();
 791        }
 792
 793        dlm_callback_stop(ls);
 794
 795        remove_lockspace(ls);
 796
 797        dlm_delete_debug_file(ls);
 798
 799        idr_destroy(&ls->ls_recover_idr);
 800        kfree(ls->ls_recover_buf);
 801
 802        /*
 803         * Free all lkb's in idr
 804         */
 805
 806        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 807        idr_destroy(&ls->ls_lkbidr);
 808
 809        /*
 810         * Free all rsb's on rsbtbl[] lists
 811         */
 812
 813        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 814                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 815                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 816                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 817                        dlm_free_rsb(rsb);
 818                }
 819
 820                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 821                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 822                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 823                        dlm_free_rsb(rsb);
 824                }
 825        }
 826
 827        vfree(ls->ls_rsbtbl);
 828
 829        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 830                kfree(ls->ls_remove_names[i]);
 831
 832        while (!list_empty(&ls->ls_new_rsb)) {
 833                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 834                                       res_hashchain);
 835                list_del(&rsb->res_hashchain);
 836                dlm_free_rsb(rsb);
 837        }
 838
 839        /*
 840         * Free structures on any other lists
 841         */
 842
 843        dlm_purge_requestqueue(ls);
 844        kfree(ls->ls_recover_args);
 845        dlm_clear_members(ls);
 846        dlm_clear_members_gone(ls);
 847        kfree(ls->ls_node_array);
 848        log_rinfo(ls, "release_lockspace final free");
 849        kobject_put(&ls->ls_kobj);
 850        /* The ls structure will be freed when the kobject is done with */
 851
 852        module_put(THIS_MODULE);
 853        return 0;
 854}
 855
 856/*
 857 * Called when a system has released all its locks and is not going to use the
 858 * lockspace any longer.  We free everything we're managing for this lockspace.
 859 * Remaining nodes will go through the recovery process as if we'd died.  The
 860 * lockspace must continue to function as usual, participating in recoveries,
 861 * until this returns.
 862 *
 863 * Force has 4 possible values:
 864 * 0 - don't destroy locksapce if it has any LKBs
 865 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 866 * 2 - destroy lockspace regardless of LKBs
 867 * 3 - destroy lockspace as part of a forced shutdown
 868 */
 869
 870int dlm_release_lockspace(void *lockspace, int force)
 871{
 872        struct dlm_ls *ls;
 873        int error;
 874
 875        ls = dlm_find_lockspace_local(lockspace);
 876        if (!ls)
 877                return -EINVAL;
 878        dlm_put_lockspace(ls);
 879
 880        mutex_lock(&ls_lock);
 881        error = release_lockspace(ls, force);
 882        if (!error)
 883                ls_count--;
 884        if (!ls_count)
 885                dlm_lowcomms_stop();
 886        mutex_unlock(&ls_lock);
 887
 888        return error;
 889}
 890
 891void dlm_stop_lockspaces(void)
 892{
 893        struct dlm_ls *ls;
 894        int count;
 895
 896 restart:
 897        count = 0;
 898        spin_lock(&lslist_lock);
 899        list_for_each_entry(ls, &lslist, ls_list) {
 900                if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 901                        count++;
 902                        continue;
 903                }
 904                spin_unlock(&lslist_lock);
 905                log_error(ls, "no userland control daemon, stopping lockspace");
 906                dlm_ls_stop(ls);
 907                goto restart;
 908        }
 909        spin_unlock(&lslist_lock);
 910
 911        if (count)
 912                log_print("dlm user daemon left %d lockspaces", count);
 913}
 914
 915