linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "dir.h"
  19#include "lowcomms.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "recover.h"
  24#include "requestqueue.h"
  25#include "user.h"
  26#include "ast.h"
  27
  28static int                      ls_count;
  29static struct mutex             ls_lock;
  30static struct list_head         lslist;
  31static spinlock_t               lslist_lock;
  32static struct task_struct *     scand_task;
  33
  34
  35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36{
  37        ssize_t ret = len;
  38        int n = simple_strtol(buf, NULL, 0);
  39
  40        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  41        if (!ls)
  42                return -EINVAL;
  43
  44        switch (n) {
  45        case 0:
  46                dlm_ls_stop(ls);
  47                break;
  48        case 1:
  49                dlm_ls_start(ls);
  50                break;
  51        default:
  52                ret = -EINVAL;
  53        }
  54        dlm_put_lockspace(ls);
  55        return ret;
  56}
  57
  58static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  59{
  60        ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
  61        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  62        wake_up(&ls->ls_uevent_wait);
  63        return len;
  64}
  65
  66static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  67{
  68        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  69}
  70
  71static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  72{
  73        ls->ls_global_id = simple_strtoul(buf, NULL, 0);
  74        return len;
  75}
  76
  77static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
  78{
  79        uint32_t status = dlm_recover_status(ls);
  80        return snprintf(buf, PAGE_SIZE, "%x\n", status);
  81}
  82
  83static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
  84{
  85        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
  86}
  87
  88struct dlm_attr {
  89        struct attribute attr;
  90        ssize_t (*show)(struct dlm_ls *, char *);
  91        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
  92};
  93
  94static struct dlm_attr dlm_attr_control = {
  95        .attr  = {.name = "control", .mode = S_IWUSR},
  96        .store = dlm_control_store
  97};
  98
  99static struct dlm_attr dlm_attr_event = {
 100        .attr  = {.name = "event_done", .mode = S_IWUSR},
 101        .store = dlm_event_store
 102};
 103
 104static struct dlm_attr dlm_attr_id = {
 105        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 106        .show  = dlm_id_show,
 107        .store = dlm_id_store
 108};
 109
 110static struct dlm_attr dlm_attr_recover_status = {
 111        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 112        .show  = dlm_recover_status_show
 113};
 114
 115static struct dlm_attr dlm_attr_recover_nodeid = {
 116        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 117        .show  = dlm_recover_nodeid_show
 118};
 119
 120static struct attribute *dlm_attrs[] = {
 121        &dlm_attr_control.attr,
 122        &dlm_attr_event.attr,
 123        &dlm_attr_id.attr,
 124        &dlm_attr_recover_status.attr,
 125        &dlm_attr_recover_nodeid.attr,
 126        NULL,
 127};
 128
 129static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 130                             char *buf)
 131{
 132        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 133        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 134        return a->show ? a->show(ls, buf) : 0;
 135}
 136
 137static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 138                              const char *buf, size_t len)
 139{
 140        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 141        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 142        return a->store ? a->store(ls, buf, len) : len;
 143}
 144
 145static void lockspace_kobj_release(struct kobject *k)
 146{
 147        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 148        kfree(ls);
 149}
 150
 151static const struct sysfs_ops dlm_attr_ops = {
 152        .show  = dlm_attr_show,
 153        .store = dlm_attr_store,
 154};
 155
 156static struct kobj_type dlm_ktype = {
 157        .default_attrs = dlm_attrs,
 158        .sysfs_ops     = &dlm_attr_ops,
 159        .release       = lockspace_kobj_release,
 160};
 161
 162static struct kset *dlm_kset;
 163
 164static int do_uevent(struct dlm_ls *ls, int in)
 165{
 166        int error;
 167
 168        if (in)
 169                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 170        else
 171                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 172
 173        log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 174
 175        /* dlm_controld will see the uevent, do the necessary group management
 176           and then write to sysfs to wake us */
 177
 178        error = wait_event_interruptible(ls->ls_uevent_wait,
 179                        test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 180
 181        log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
 182
 183        if (error)
 184                goto out;
 185
 186        error = ls->ls_uevent_result;
 187 out:
 188        if (error)
 189                log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 190                          error, ls->ls_uevent_result);
 191        return error;
 192}
 193
 194static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 195                      struct kobj_uevent_env *env)
 196{
 197        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 198
 199        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 200        return 0;
 201}
 202
 203static struct kset_uevent_ops dlm_uevent_ops = {
 204        .uevent = dlm_uevent,
 205};
 206
 207int __init dlm_lockspace_init(void)
 208{
 209        ls_count = 0;
 210        mutex_init(&ls_lock);
 211        INIT_LIST_HEAD(&lslist);
 212        spin_lock_init(&lslist_lock);
 213
 214        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 215        if (!dlm_kset) {
 216                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 217                return -ENOMEM;
 218        }
 219        return 0;
 220}
 221
 222void dlm_lockspace_exit(void)
 223{
 224        kset_unregister(dlm_kset);
 225}
 226
 227static struct dlm_ls *find_ls_to_scan(void)
 228{
 229        struct dlm_ls *ls;
 230
 231        spin_lock(&lslist_lock);
 232        list_for_each_entry(ls, &lslist, ls_list) {
 233                if (time_after_eq(jiffies, ls->ls_scan_time +
 234                                            dlm_config.ci_scan_secs * HZ)) {
 235                        spin_unlock(&lslist_lock);
 236                        return ls;
 237                }
 238        }
 239        spin_unlock(&lslist_lock);
 240        return NULL;
 241}
 242
 243static int dlm_scand(void *data)
 244{
 245        struct dlm_ls *ls;
 246
 247        while (!kthread_should_stop()) {
 248                ls = find_ls_to_scan();
 249                if (ls) {
 250                        if (dlm_lock_recovery_try(ls)) {
 251                                ls->ls_scan_time = jiffies;
 252                                dlm_scan_rsbs(ls);
 253                                dlm_scan_timeout(ls);
 254                                dlm_scan_waiters(ls);
 255                                dlm_unlock_recovery(ls);
 256                        } else {
 257                                ls->ls_scan_time += HZ;
 258                        }
 259                        continue;
 260                }
 261                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 262        }
 263        return 0;
 264}
 265
 266static int dlm_scand_start(void)
 267{
 268        struct task_struct *p;
 269        int error = 0;
 270
 271        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 272        if (IS_ERR(p))
 273                error = PTR_ERR(p);
 274        else
 275                scand_task = p;
 276        return error;
 277}
 278
 279static void dlm_scand_stop(void)
 280{
 281        kthread_stop(scand_task);
 282}
 283
 284struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 285{
 286        struct dlm_ls *ls;
 287
 288        spin_lock(&lslist_lock);
 289
 290        list_for_each_entry(ls, &lslist, ls_list) {
 291                if (ls->ls_global_id == id) {
 292                        ls->ls_count++;
 293                        goto out;
 294                }
 295        }
 296        ls = NULL;
 297 out:
 298        spin_unlock(&lslist_lock);
 299        return ls;
 300}
 301
 302struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 303{
 304        struct dlm_ls *ls;
 305
 306        spin_lock(&lslist_lock);
 307        list_for_each_entry(ls, &lslist, ls_list) {
 308                if (ls->ls_local_handle == lockspace) {
 309                        ls->ls_count++;
 310                        goto out;
 311                }
 312        }
 313        ls = NULL;
 314 out:
 315        spin_unlock(&lslist_lock);
 316        return ls;
 317}
 318
 319struct dlm_ls *dlm_find_lockspace_device(int minor)
 320{
 321        struct dlm_ls *ls;
 322
 323        spin_lock(&lslist_lock);
 324        list_for_each_entry(ls, &lslist, ls_list) {
 325                if (ls->ls_device.minor == minor) {
 326                        ls->ls_count++;
 327                        goto out;
 328                }
 329        }
 330        ls = NULL;
 331 out:
 332        spin_unlock(&lslist_lock);
 333        return ls;
 334}
 335
 336void dlm_put_lockspace(struct dlm_ls *ls)
 337{
 338        spin_lock(&lslist_lock);
 339        ls->ls_count--;
 340        spin_unlock(&lslist_lock);
 341}
 342
 343static void remove_lockspace(struct dlm_ls *ls)
 344{
 345        for (;;) {
 346                spin_lock(&lslist_lock);
 347                if (ls->ls_count == 0) {
 348                        WARN_ON(ls->ls_create_count != 0);
 349                        list_del(&ls->ls_list);
 350                        spin_unlock(&lslist_lock);
 351                        return;
 352                }
 353                spin_unlock(&lslist_lock);
 354                ssleep(1);
 355        }
 356}
 357
 358static int threads_start(void)
 359{
 360        int error;
 361
 362        error = dlm_scand_start();
 363        if (error) {
 364                log_print("cannot start dlm_scand thread %d", error);
 365                goto fail;
 366        }
 367
 368        /* Thread for sending/receiving messages for all lockspace's */
 369        error = dlm_lowcomms_start();
 370        if (error) {
 371                log_print("cannot start dlm lowcomms %d", error);
 372                goto scand_fail;
 373        }
 374
 375        return 0;
 376
 377 scand_fail:
 378        dlm_scand_stop();
 379 fail:
 380        return error;
 381}
 382
 383static void threads_stop(void)
 384{
 385        dlm_scand_stop();
 386        dlm_lowcomms_stop();
 387}
 388
 389static int new_lockspace(const char *name, const char *cluster,
 390                         uint32_t flags, int lvblen,
 391                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 392                         int *ops_result, dlm_lockspace_t **lockspace)
 393{
 394        struct dlm_ls *ls;
 395        int i, size, error;
 396        int do_unreg = 0;
 397        int namelen = strlen(name);
 398
 399        if (namelen > DLM_LOCKSPACE_LEN)
 400                return -EINVAL;
 401
 402        if (!lvblen || (lvblen % 8))
 403                return -EINVAL;
 404
 405        if (!try_module_get(THIS_MODULE))
 406                return -EINVAL;
 407
 408        if (!dlm_user_daemon_available()) {
 409                log_print("dlm user daemon not available");
 410                error = -EUNATCH;
 411                goto out;
 412        }
 413
 414        if (ops && ops_result) {
 415                if (!dlm_config.ci_recover_callbacks)
 416                        *ops_result = -EOPNOTSUPP;
 417                else
 418                        *ops_result = 0;
 419        }
 420
 421        if (dlm_config.ci_recover_callbacks && cluster &&
 422            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 423                log_print("dlm cluster name %s mismatch %s",
 424                          dlm_config.ci_cluster_name, cluster);
 425                error = -EBADR;
 426                goto out;
 427        }
 428
 429        error = 0;
 430
 431        spin_lock(&lslist_lock);
 432        list_for_each_entry(ls, &lslist, ls_list) {
 433                WARN_ON(ls->ls_create_count <= 0);
 434                if (ls->ls_namelen != namelen)
 435                        continue;
 436                if (memcmp(ls->ls_name, name, namelen))
 437                        continue;
 438                if (flags & DLM_LSFL_NEWEXCL) {
 439                        error = -EEXIST;
 440                        break;
 441                }
 442                ls->ls_create_count++;
 443                *lockspace = ls;
 444                error = 1;
 445                break;
 446        }
 447        spin_unlock(&lslist_lock);
 448
 449        if (error)
 450                goto out;
 451
 452        error = -ENOMEM;
 453
 454        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 455        if (!ls)
 456                goto out;
 457        memcpy(ls->ls_name, name, namelen);
 458        ls->ls_namelen = namelen;
 459        ls->ls_lvblen = lvblen;
 460        ls->ls_count = 0;
 461        ls->ls_flags = 0;
 462        ls->ls_scan_time = jiffies;
 463
 464        if (ops && dlm_config.ci_recover_callbacks) {
 465                ls->ls_ops = ops;
 466                ls->ls_ops_arg = ops_arg;
 467        }
 468
 469        if (flags & DLM_LSFL_TIMEWARN)
 470                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 471
 472        /* ls_exflags are forced to match among nodes, and we don't
 473           need to require all nodes to have some flags set */
 474        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 475                                    DLM_LSFL_NEWEXCL));
 476
 477        size = dlm_config.ci_rsbtbl_size;
 478        ls->ls_rsbtbl_size = size;
 479
 480        ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
 481        if (!ls->ls_rsbtbl)
 482                goto out_lsfree;
 483        for (i = 0; i < size; i++) {
 484                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 485                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 486                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 487        }
 488
 489        idr_init(&ls->ls_lkbidr);
 490        spin_lock_init(&ls->ls_lkbidr_spin);
 491
 492        size = dlm_config.ci_dirtbl_size;
 493        ls->ls_dirtbl_size = size;
 494
 495        ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
 496        if (!ls->ls_dirtbl)
 497                goto out_lkbfree;
 498        for (i = 0; i < size; i++) {
 499                INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
 500                spin_lock_init(&ls->ls_dirtbl[i].lock);
 501        }
 502
 503        INIT_LIST_HEAD(&ls->ls_waiters);
 504        mutex_init(&ls->ls_waiters_mutex);
 505        INIT_LIST_HEAD(&ls->ls_orphans);
 506        mutex_init(&ls->ls_orphans_mutex);
 507        INIT_LIST_HEAD(&ls->ls_timeout);
 508        mutex_init(&ls->ls_timeout_mutex);
 509
 510        INIT_LIST_HEAD(&ls->ls_new_rsb);
 511        spin_lock_init(&ls->ls_new_rsb_spin);
 512
 513        INIT_LIST_HEAD(&ls->ls_nodes);
 514        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 515        ls->ls_num_nodes = 0;
 516        ls->ls_low_nodeid = 0;
 517        ls->ls_total_weight = 0;
 518        ls->ls_node_array = NULL;
 519
 520        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 521        ls->ls_stub_rsb.res_ls = ls;
 522
 523        ls->ls_debug_rsb_dentry = NULL;
 524        ls->ls_debug_waiters_dentry = NULL;
 525
 526        init_waitqueue_head(&ls->ls_uevent_wait);
 527        ls->ls_uevent_result = 0;
 528        init_completion(&ls->ls_members_done);
 529        ls->ls_members_result = -1;
 530
 531        mutex_init(&ls->ls_cb_mutex);
 532        INIT_LIST_HEAD(&ls->ls_cb_delay);
 533
 534        ls->ls_recoverd_task = NULL;
 535        mutex_init(&ls->ls_recoverd_active);
 536        spin_lock_init(&ls->ls_recover_lock);
 537        spin_lock_init(&ls->ls_rcom_spin);
 538        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 539        ls->ls_recover_status = 0;
 540        ls->ls_recover_seq = 0;
 541        ls->ls_recover_args = NULL;
 542        init_rwsem(&ls->ls_in_recovery);
 543        init_rwsem(&ls->ls_recv_active);
 544        INIT_LIST_HEAD(&ls->ls_requestqueue);
 545        mutex_init(&ls->ls_requestqueue_mutex);
 546        mutex_init(&ls->ls_clear_proc_locks);
 547
 548        ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 549        if (!ls->ls_recover_buf)
 550                goto out_dirfree;
 551
 552        ls->ls_slot = 0;
 553        ls->ls_num_slots = 0;
 554        ls->ls_slots_size = 0;
 555        ls->ls_slots = NULL;
 556
 557        INIT_LIST_HEAD(&ls->ls_recover_list);
 558        spin_lock_init(&ls->ls_recover_list_lock);
 559        ls->ls_recover_list_count = 0;
 560        ls->ls_local_handle = ls;
 561        init_waitqueue_head(&ls->ls_wait_general);
 562        INIT_LIST_HEAD(&ls->ls_root_list);
 563        init_rwsem(&ls->ls_root_sem);
 564
 565        down_write(&ls->ls_in_recovery);
 566
 567        spin_lock(&lslist_lock);
 568        ls->ls_create_count = 1;
 569        list_add(&ls->ls_list, &lslist);
 570        spin_unlock(&lslist_lock);
 571
 572        if (flags & DLM_LSFL_FS) {
 573                error = dlm_callback_start(ls);
 574                if (error) {
 575                        log_error(ls, "can't start dlm_callback %d", error);
 576                        goto out_delist;
 577                }
 578        }
 579
 580        /* needs to find ls in lslist */
 581        error = dlm_recoverd_start(ls);
 582        if (error) {
 583                log_error(ls, "can't start dlm_recoverd %d", error);
 584                goto out_callback;
 585        }
 586
 587        ls->ls_kobj.kset = dlm_kset;
 588        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 589                                     "%s", ls->ls_name);
 590        if (error)
 591                goto out_recoverd;
 592        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 593
 594        /* let kobject handle freeing of ls if there's an error */
 595        do_unreg = 1;
 596
 597        /* This uevent triggers dlm_controld in userspace to add us to the
 598           group of nodes that are members of this lockspace (managed by the
 599           cluster infrastructure.)  Once it's done that, it tells us who the
 600           current lockspace members are (via configfs) and then tells the
 601           lockspace to start running (via sysfs) in dlm_ls_start(). */
 602
 603        error = do_uevent(ls, 1);
 604        if (error)
 605                goto out_recoverd;
 606
 607        wait_for_completion(&ls->ls_members_done);
 608        error = ls->ls_members_result;
 609        if (error)
 610                goto out_members;
 611
 612        dlm_create_debug_file(ls);
 613
 614        log_debug(ls, "join complete");
 615        *lockspace = ls;
 616        return 0;
 617
 618 out_members:
 619        do_uevent(ls, 0);
 620        dlm_clear_members(ls);
 621        kfree(ls->ls_node_array);
 622 out_recoverd:
 623        dlm_recoverd_stop(ls);
 624 out_callback:
 625        dlm_callback_stop(ls);
 626 out_delist:
 627        spin_lock(&lslist_lock);
 628        list_del(&ls->ls_list);
 629        spin_unlock(&lslist_lock);
 630        kfree(ls->ls_recover_buf);
 631 out_dirfree:
 632        vfree(ls->ls_dirtbl);
 633 out_lkbfree:
 634        idr_destroy(&ls->ls_lkbidr);
 635        vfree(ls->ls_rsbtbl);
 636 out_lsfree:
 637        if (do_unreg)
 638                kobject_put(&ls->ls_kobj);
 639        else
 640                kfree(ls);
 641 out:
 642        module_put(THIS_MODULE);
 643        return error;
 644}
 645
 646int dlm_new_lockspace(const char *name, const char *cluster,
 647                      uint32_t flags, int lvblen,
 648                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 649                      int *ops_result, dlm_lockspace_t **lockspace)
 650{
 651        int error = 0;
 652
 653        mutex_lock(&ls_lock);
 654        if (!ls_count)
 655                error = threads_start();
 656        if (error)
 657                goto out;
 658
 659        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 660                              ops_result, lockspace);
 661        if (!error)
 662                ls_count++;
 663        if (error > 0)
 664                error = 0;
 665        if (!ls_count)
 666                threads_stop();
 667 out:
 668        mutex_unlock(&ls_lock);
 669        return error;
 670}
 671
 672static int lkb_idr_is_local(int id, void *p, void *data)
 673{
 674        struct dlm_lkb *lkb = p;
 675
 676        if (!lkb->lkb_nodeid)
 677                return 1;
 678        return 0;
 679}
 680
 681static int lkb_idr_is_any(int id, void *p, void *data)
 682{
 683        return 1;
 684}
 685
 686static int lkb_idr_free(int id, void *p, void *data)
 687{
 688        struct dlm_lkb *lkb = p;
 689
 690        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 691                dlm_free_lvb(lkb->lkb_lvbptr);
 692
 693        dlm_free_lkb(lkb);
 694        return 0;
 695}
 696
 697/* NOTE: We check the lkbidr here rather than the resource table.
 698   This is because there may be LKBs queued as ASTs that have been unlinked
 699   from their RSBs and are pending deletion once the AST has been delivered */
 700
 701static int lockspace_busy(struct dlm_ls *ls, int force)
 702{
 703        int rv;
 704
 705        spin_lock(&ls->ls_lkbidr_spin);
 706        if (force == 0) {
 707                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 708        } else if (force == 1) {
 709                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 710        } else {
 711                rv = 0;
 712        }
 713        spin_unlock(&ls->ls_lkbidr_spin);
 714        return rv;
 715}
 716
 717static int release_lockspace(struct dlm_ls *ls, int force)
 718{
 719        struct dlm_rsb *rsb;
 720        struct rb_node *n;
 721        int i, busy, rv;
 722
 723        busy = lockspace_busy(ls, force);
 724
 725        spin_lock(&lslist_lock);
 726        if (ls->ls_create_count == 1) {
 727                if (busy) {
 728                        rv = -EBUSY;
 729                } else {
 730                        /* remove_lockspace takes ls off lslist */
 731                        ls->ls_create_count = 0;
 732                        rv = 0;
 733                }
 734        } else if (ls->ls_create_count > 1) {
 735                rv = --ls->ls_create_count;
 736        } else {
 737                rv = -EINVAL;
 738        }
 739        spin_unlock(&lslist_lock);
 740
 741        if (rv) {
 742                log_debug(ls, "release_lockspace no remove %d", rv);
 743                return rv;
 744        }
 745
 746        dlm_device_deregister(ls);
 747
 748        if (force < 3 && dlm_user_daemon_available())
 749                do_uevent(ls, 0);
 750
 751        dlm_recoverd_stop(ls);
 752
 753        dlm_callback_stop(ls);
 754
 755        remove_lockspace(ls);
 756
 757        dlm_delete_debug_file(ls);
 758
 759        kfree(ls->ls_recover_buf);
 760
 761        /*
 762         * Free direntry structs.
 763         */
 764
 765        dlm_dir_clear(ls);
 766        vfree(ls->ls_dirtbl);
 767
 768        /*
 769         * Free all lkb's in idr
 770         */
 771
 772        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 773        idr_remove_all(&ls->ls_lkbidr);
 774        idr_destroy(&ls->ls_lkbidr);
 775
 776        /*
 777         * Free all rsb's on rsbtbl[] lists
 778         */
 779
 780        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 781                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 782                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 783                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 784                        dlm_free_rsb(rsb);
 785                }
 786
 787                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 788                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 789                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 790                        dlm_free_rsb(rsb);
 791                }
 792        }
 793
 794        vfree(ls->ls_rsbtbl);
 795
 796        while (!list_empty(&ls->ls_new_rsb)) {
 797                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 798                                       res_hashchain);
 799                list_del(&rsb->res_hashchain);
 800                dlm_free_rsb(rsb);
 801        }
 802
 803        /*
 804         * Free structures on any other lists
 805         */
 806
 807        dlm_purge_requestqueue(ls);
 808        kfree(ls->ls_recover_args);
 809        dlm_clear_free_entries(ls);
 810        dlm_clear_members(ls);
 811        dlm_clear_members_gone(ls);
 812        kfree(ls->ls_node_array);
 813        log_debug(ls, "release_lockspace final free");
 814        kobject_put(&ls->ls_kobj);
 815        /* The ls structure will be freed when the kobject is done with */
 816
 817        module_put(THIS_MODULE);
 818        return 0;
 819}
 820
 821/*
 822 * Called when a system has released all its locks and is not going to use the
 823 * lockspace any longer.  We free everything we're managing for this lockspace.
 824 * Remaining nodes will go through the recovery process as if we'd died.  The
 825 * lockspace must continue to function as usual, participating in recoveries,
 826 * until this returns.
 827 *
 828 * Force has 4 possible values:
 829 * 0 - don't destroy locksapce if it has any LKBs
 830 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 831 * 2 - destroy lockspace regardless of LKBs
 832 * 3 - destroy lockspace as part of a forced shutdown
 833 */
 834
 835int dlm_release_lockspace(void *lockspace, int force)
 836{
 837        struct dlm_ls *ls;
 838        int error;
 839
 840        ls = dlm_find_lockspace_local(lockspace);
 841        if (!ls)
 842                return -EINVAL;
 843        dlm_put_lockspace(ls);
 844
 845        mutex_lock(&ls_lock);
 846        error = release_lockspace(ls, force);
 847        if (!error)
 848                ls_count--;
 849        if (!ls_count)
 850                threads_stop();
 851        mutex_unlock(&ls_lock);
 852
 853        return error;
 854}
 855
 856void dlm_stop_lockspaces(void)
 857{
 858        struct dlm_ls *ls;
 859
 860 restart:
 861        spin_lock(&lslist_lock);
 862        list_for_each_entry(ls, &lslist, ls_list) {
 863                if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
 864                        continue;
 865                spin_unlock(&lslist_lock);
 866                log_error(ls, "no userland control daemon, stopping lockspace");
 867                dlm_ls_stop(ls);
 868                goto restart;
 869        }
 870        spin_unlock(&lslist_lock);
 871}
 872
 873
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.