linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "dir.h"
  19#include "lowcomms.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "recover.h"
  24#include "requestqueue.h"
  25#include "user.h"
  26#include "ast.h"
  27
  28static int                      ls_count;
  29static struct mutex             ls_lock;
  30static struct list_head         lslist;
  31static spinlock_t               lslist_lock;
  32static struct task_struct *     scand_task;
  33
  34
  35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36{
  37        ssize_t ret = len;
  38        int n = simple_strtol(buf, NULL, 0);
  39
  40        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  41        if (!ls)
  42                return -EINVAL;
  43
  44        switch (n) {
  45        case 0:
  46                dlm_ls_stop(ls);
  47                break;
  48        case 1:
  49                dlm_ls_start(ls);
  50                break;
  51        default:
  52                ret = -EINVAL;
  53        }
  54        dlm_put_lockspace(ls);
  55        return ret;
  56}
  57
  58static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  59{
  60        ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
  61        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  62        wake_up(&ls->ls_uevent_wait);
  63        return len;
  64}
  65
  66static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  67{
  68        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  69}
  70
  71static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  72{
  73        ls->ls_global_id = simple_strtoul(buf, NULL, 0);
  74        return len;
  75}
  76
  77static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  78{
  79        return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  80}
  81
  82static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  83{
  84        int val = simple_strtoul(buf, NULL, 0);
  85        if (val == 1)
  86                set_bit(LSFL_NODIR, &ls->ls_flags);
  87        return len;
  88}
  89
  90static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
  91{
  92        uint32_t status = dlm_recover_status(ls);
  93        return snprintf(buf, PAGE_SIZE, "%x\n", status);
  94}
  95
  96static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
  97{
  98        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
  99}
 100
 101struct dlm_attr {
 102        struct attribute attr;
 103        ssize_t (*show)(struct dlm_ls *, char *);
 104        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 105};
 106
 107static struct dlm_attr dlm_attr_control = {
 108        .attr  = {.name = "control", .mode = S_IWUSR},
 109        .store = dlm_control_store
 110};
 111
 112static struct dlm_attr dlm_attr_event = {
 113        .attr  = {.name = "event_done", .mode = S_IWUSR},
 114        .store = dlm_event_store
 115};
 116
 117static struct dlm_attr dlm_attr_id = {
 118        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 119        .show  = dlm_id_show,
 120        .store = dlm_id_store
 121};
 122
 123static struct dlm_attr dlm_attr_nodir = {
 124        .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 125        .show  = dlm_nodir_show,
 126        .store = dlm_nodir_store
 127};
 128
 129static struct dlm_attr dlm_attr_recover_status = {
 130        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 131        .show  = dlm_recover_status_show
 132};
 133
 134static struct dlm_attr dlm_attr_recover_nodeid = {
 135        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 136        .show  = dlm_recover_nodeid_show
 137};
 138
 139static struct attribute *dlm_attrs[] = {
 140        &dlm_attr_control.attr,
 141        &dlm_attr_event.attr,
 142        &dlm_attr_id.attr,
 143        &dlm_attr_nodir.attr,
 144        &dlm_attr_recover_status.attr,
 145        &dlm_attr_recover_nodeid.attr,
 146        NULL,
 147};
 148
 149static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 150                             char *buf)
 151{
 152        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 153        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 154        return a->show ? a->show(ls, buf) : 0;
 155}
 156
 157static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 158                              const char *buf, size_t len)
 159{
 160        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 161        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 162        return a->store ? a->store(ls, buf, len) : len;
 163}
 164
 165static void lockspace_kobj_release(struct kobject *k)
 166{
 167        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 168        kfree(ls);
 169}
 170
 171static const struct sysfs_ops dlm_attr_ops = {
 172        .show  = dlm_attr_show,
 173        .store = dlm_attr_store,
 174};
 175
 176static struct kobj_type dlm_ktype = {
 177        .default_attrs = dlm_attrs,
 178        .sysfs_ops     = &dlm_attr_ops,
 179        .release       = lockspace_kobj_release,
 180};
 181
 182static struct kset *dlm_kset;
 183
 184static int do_uevent(struct dlm_ls *ls, int in)
 185{
 186        int error;
 187
 188        if (in)
 189                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 190        else
 191                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 192
 193        log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 194
 195        /* dlm_controld will see the uevent, do the necessary group management
 196           and then write to sysfs to wake us */
 197
 198        error = wait_event_interruptible(ls->ls_uevent_wait,
 199                        test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 200
 201        log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
 202
 203        if (error)
 204                goto out;
 205
 206        error = ls->ls_uevent_result;
 207 out:
 208        if (error)
 209                log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 210                          error, ls->ls_uevent_result);
 211        return error;
 212}
 213
 214static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 215                      struct kobj_uevent_env *env)
 216{
 217        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 218
 219        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 220        return 0;
 221}
 222
 223static struct kset_uevent_ops dlm_uevent_ops = {
 224        .uevent = dlm_uevent,
 225};
 226
 227int __init dlm_lockspace_init(void)
 228{
 229        ls_count = 0;
 230        mutex_init(&ls_lock);
 231        INIT_LIST_HEAD(&lslist);
 232        spin_lock_init(&lslist_lock);
 233
 234        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 235        if (!dlm_kset) {
 236                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 237                return -ENOMEM;
 238        }
 239        return 0;
 240}
 241
 242void dlm_lockspace_exit(void)
 243{
 244        kset_unregister(dlm_kset);
 245}
 246
 247static struct dlm_ls *find_ls_to_scan(void)
 248{
 249        struct dlm_ls *ls;
 250
 251        spin_lock(&lslist_lock);
 252        list_for_each_entry(ls, &lslist, ls_list) {
 253                if (time_after_eq(jiffies, ls->ls_scan_time +
 254                                            dlm_config.ci_scan_secs * HZ)) {
 255                        spin_unlock(&lslist_lock);
 256                        return ls;
 257                }
 258        }
 259        spin_unlock(&lslist_lock);
 260        return NULL;
 261}
 262
 263static int dlm_scand(void *data)
 264{
 265        struct dlm_ls *ls;
 266
 267        while (!kthread_should_stop()) {
 268                ls = find_ls_to_scan();
 269                if (ls) {
 270                        if (dlm_lock_recovery_try(ls)) {
 271                                ls->ls_scan_time = jiffies;
 272                                dlm_scan_rsbs(ls);
 273                                dlm_scan_timeout(ls);
 274                                dlm_scan_waiters(ls);
 275                                dlm_unlock_recovery(ls);
 276                        } else {
 277                                ls->ls_scan_time += HZ;
 278                        }
 279                        continue;
 280                }
 281                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 282        }
 283        return 0;
 284}
 285
 286static int dlm_scand_start(void)
 287{
 288        struct task_struct *p;
 289        int error = 0;
 290
 291        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 292        if (IS_ERR(p))
 293                error = PTR_ERR(p);
 294        else
 295                scand_task = p;
 296        return error;
 297}
 298
 299static void dlm_scand_stop(void)
 300{
 301        kthread_stop(scand_task);
 302}
 303
 304struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 305{
 306        struct dlm_ls *ls;
 307
 308        spin_lock(&lslist_lock);
 309
 310        list_for_each_entry(ls, &lslist, ls_list) {
 311                if (ls->ls_global_id == id) {
 312                        ls->ls_count++;
 313                        goto out;
 314                }
 315        }
 316        ls = NULL;
 317 out:
 318        spin_unlock(&lslist_lock);
 319        return ls;
 320}
 321
 322struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 323{
 324        struct dlm_ls *ls;
 325
 326        spin_lock(&lslist_lock);
 327        list_for_each_entry(ls, &lslist, ls_list) {
 328                if (ls->ls_local_handle == lockspace) {
 329                        ls->ls_count++;
 330                        goto out;
 331                }
 332        }
 333        ls = NULL;
 334 out:
 335        spin_unlock(&lslist_lock);
 336        return ls;
 337}
 338
 339struct dlm_ls *dlm_find_lockspace_device(int minor)
 340{
 341        struct dlm_ls *ls;
 342
 343        spin_lock(&lslist_lock);
 344        list_for_each_entry(ls, &lslist, ls_list) {
 345                if (ls->ls_device.minor == minor) {
 346                        ls->ls_count++;
 347                        goto out;
 348                }
 349        }
 350        ls = NULL;
 351 out:
 352        spin_unlock(&lslist_lock);
 353        return ls;
 354}
 355
 356void dlm_put_lockspace(struct dlm_ls *ls)
 357{
 358        spin_lock(&lslist_lock);
 359        ls->ls_count--;
 360        spin_unlock(&lslist_lock);
 361}
 362
 363static void remove_lockspace(struct dlm_ls *ls)
 364{
 365        for (;;) {
 366                spin_lock(&lslist_lock);
 367                if (ls->ls_count == 0) {
 368                        WARN_ON(ls->ls_create_count != 0);
 369                        list_del(&ls->ls_list);
 370                        spin_unlock(&lslist_lock);
 371                        return;
 372                }
 373                spin_unlock(&lslist_lock);
 374                ssleep(1);
 375        }
 376}
 377
 378static int threads_start(void)
 379{
 380        int error;
 381
 382        error = dlm_scand_start();
 383        if (error) {
 384                log_print("cannot start dlm_scand thread %d", error);
 385                goto fail;
 386        }
 387
 388        /* Thread for sending/receiving messages for all lockspace's */
 389        error = dlm_lowcomms_start();
 390        if (error) {
 391                log_print("cannot start dlm lowcomms %d", error);
 392                goto scand_fail;
 393        }
 394
 395        return 0;
 396
 397 scand_fail:
 398        dlm_scand_stop();
 399 fail:
 400        return error;
 401}
 402
 403static void threads_stop(void)
 404{
 405        dlm_scand_stop();
 406        dlm_lowcomms_stop();
 407}
 408
 409static int new_lockspace(const char *name, const char *cluster,
 410                         uint32_t flags, int lvblen,
 411                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 412                         int *ops_result, dlm_lockspace_t **lockspace)
 413{
 414        struct dlm_ls *ls;
 415        int i, size, error;
 416        int do_unreg = 0;
 417        int namelen = strlen(name);
 418
 419        if (namelen > DLM_LOCKSPACE_LEN)
 420                return -EINVAL;
 421
 422        if (!lvblen || (lvblen % 8))
 423                return -EINVAL;
 424
 425        if (!try_module_get(THIS_MODULE))
 426                return -EINVAL;
 427
 428        if (!dlm_user_daemon_available()) {
 429                log_print("dlm user daemon not available");
 430                error = -EUNATCH;
 431                goto out;
 432        }
 433
 434        if (ops && ops_result) {
 435                if (!dlm_config.ci_recover_callbacks)
 436                        *ops_result = -EOPNOTSUPP;
 437                else
 438                        *ops_result = 0;
 439        }
 440
 441        if (dlm_config.ci_recover_callbacks && cluster &&
 442            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 443                log_print("dlm cluster name %s mismatch %s",
 444                          dlm_config.ci_cluster_name, cluster);
 445                error = -EBADR;
 446                goto out;
 447        }
 448
 449        error = 0;
 450
 451        spin_lock(&lslist_lock);
 452        list_for_each_entry(ls, &lslist, ls_list) {
 453                WARN_ON(ls->ls_create_count <= 0);
 454                if (ls->ls_namelen != namelen)
 455                        continue;
 456                if (memcmp(ls->ls_name, name, namelen))
 457                        continue;
 458                if (flags & DLM_LSFL_NEWEXCL) {
 459                        error = -EEXIST;
 460                        break;
 461                }
 462                ls->ls_create_count++;
 463                *lockspace = ls;
 464                error = 1;
 465                break;
 466        }
 467        spin_unlock(&lslist_lock);
 468
 469        if (error)
 470                goto out;
 471
 472        error = -ENOMEM;
 473
 474        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 475        if (!ls)
 476                goto out;
 477        memcpy(ls->ls_name, name, namelen);
 478        ls->ls_namelen = namelen;
 479        ls->ls_lvblen = lvblen;
 480        ls->ls_count = 0;
 481        ls->ls_flags = 0;
 482        ls->ls_scan_time = jiffies;
 483
 484        if (ops && dlm_config.ci_recover_callbacks) {
 485                ls->ls_ops = ops;
 486                ls->ls_ops_arg = ops_arg;
 487        }
 488
 489        if (flags & DLM_LSFL_TIMEWARN)
 490                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 491
 492        /* ls_exflags are forced to match among nodes, and we don't
 493           need to require all nodes to have some flags set */
 494        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 495                                    DLM_LSFL_NEWEXCL));
 496
 497        size = dlm_config.ci_rsbtbl_size;
 498        ls->ls_rsbtbl_size = size;
 499
 500        ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
 501        if (!ls->ls_rsbtbl)
 502                goto out_lsfree;
 503        for (i = 0; i < size; i++) {
 504                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 505                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 506                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 507        }
 508
 509        spin_lock_init(&ls->ls_remove_spin);
 510
 511        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 512                ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 513                                                 GFP_KERNEL);
 514                if (!ls->ls_remove_names[i])
 515                        goto out_rsbtbl;
 516        }
 517
 518        idr_init(&ls->ls_lkbidr);
 519        spin_lock_init(&ls->ls_lkbidr_spin);
 520
 521        INIT_LIST_HEAD(&ls->ls_waiters);
 522        mutex_init(&ls->ls_waiters_mutex);
 523        INIT_LIST_HEAD(&ls->ls_orphans);
 524        mutex_init(&ls->ls_orphans_mutex);
 525        INIT_LIST_HEAD(&ls->ls_timeout);
 526        mutex_init(&ls->ls_timeout_mutex);
 527
 528        INIT_LIST_HEAD(&ls->ls_new_rsb);
 529        spin_lock_init(&ls->ls_new_rsb_spin);
 530
 531        INIT_LIST_HEAD(&ls->ls_nodes);
 532        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 533        ls->ls_num_nodes = 0;
 534        ls->ls_low_nodeid = 0;
 535        ls->ls_total_weight = 0;
 536        ls->ls_node_array = NULL;
 537
 538        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 539        ls->ls_stub_rsb.res_ls = ls;
 540
 541        ls->ls_debug_rsb_dentry = NULL;
 542        ls->ls_debug_waiters_dentry = NULL;
 543
 544        init_waitqueue_head(&ls->ls_uevent_wait);
 545        ls->ls_uevent_result = 0;
 546        init_completion(&ls->ls_members_done);
 547        ls->ls_members_result = -1;
 548
 549        mutex_init(&ls->ls_cb_mutex);
 550        INIT_LIST_HEAD(&ls->ls_cb_delay);
 551
 552        ls->ls_recoverd_task = NULL;
 553        mutex_init(&ls->ls_recoverd_active);
 554        spin_lock_init(&ls->ls_recover_lock);
 555        spin_lock_init(&ls->ls_rcom_spin);
 556        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 557        ls->ls_recover_status = 0;
 558        ls->ls_recover_seq = 0;
 559        ls->ls_recover_args = NULL;
 560        init_rwsem(&ls->ls_in_recovery);
 561        init_rwsem(&ls->ls_recv_active);
 562        INIT_LIST_HEAD(&ls->ls_requestqueue);
 563        mutex_init(&ls->ls_requestqueue_mutex);
 564        mutex_init(&ls->ls_clear_proc_locks);
 565
 566        ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 567        if (!ls->ls_recover_buf)
 568                goto out_lkbidr;
 569
 570        ls->ls_slot = 0;
 571        ls->ls_num_slots = 0;
 572        ls->ls_slots_size = 0;
 573        ls->ls_slots = NULL;
 574
 575        INIT_LIST_HEAD(&ls->ls_recover_list);
 576        spin_lock_init(&ls->ls_recover_list_lock);
 577        idr_init(&ls->ls_recover_idr);
 578        spin_lock_init(&ls->ls_recover_idr_lock);
 579        ls->ls_recover_list_count = 0;
 580        ls->ls_local_handle = ls;
 581        init_waitqueue_head(&ls->ls_wait_general);
 582        INIT_LIST_HEAD(&ls->ls_root_list);
 583        init_rwsem(&ls->ls_root_sem);
 584
 585        down_write(&ls->ls_in_recovery);
 586
 587        spin_lock(&lslist_lock);
 588        ls->ls_create_count = 1;
 589        list_add(&ls->ls_list, &lslist);
 590        spin_unlock(&lslist_lock);
 591
 592        if (flags & DLM_LSFL_FS) {
 593                error = dlm_callback_start(ls);
 594                if (error) {
 595                        log_error(ls, "can't start dlm_callback %d", error);
 596                        goto out_delist;
 597                }
 598        }
 599
 600        /* needs to find ls in lslist */
 601        error = dlm_recoverd_start(ls);
 602        if (error) {
 603                log_error(ls, "can't start dlm_recoverd %d", error);
 604                goto out_callback;
 605        }
 606
 607        ls->ls_kobj.kset = dlm_kset;
 608        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 609                                     "%s", ls->ls_name);
 610        if (error)
 611                goto out_recoverd;
 612        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 613
 614        /* let kobject handle freeing of ls if there's an error */
 615        do_unreg = 1;
 616
 617        /* This uevent triggers dlm_controld in userspace to add us to the
 618           group of nodes that are members of this lockspace (managed by the
 619           cluster infrastructure.)  Once it's done that, it tells us who the
 620           current lockspace members are (via configfs) and then tells the
 621           lockspace to start running (via sysfs) in dlm_ls_start(). */
 622
 623        error = do_uevent(ls, 1);
 624        if (error)
 625                goto out_recoverd;
 626
 627        wait_for_completion(&ls->ls_members_done);
 628        error = ls->ls_members_result;
 629        if (error)
 630                goto out_members;
 631
 632        dlm_create_debug_file(ls);
 633
 634        log_debug(ls, "join complete");
 635        *lockspace = ls;
 636        return 0;
 637
 638 out_members:
 639        do_uevent(ls, 0);
 640        dlm_clear_members(ls);
 641        kfree(ls->ls_node_array);
 642 out_recoverd:
 643        dlm_recoverd_stop(ls);
 644 out_callback:
 645        dlm_callback_stop(ls);
 646 out_delist:
 647        spin_lock(&lslist_lock);
 648        list_del(&ls->ls_list);
 649        spin_unlock(&lslist_lock);
 650        idr_destroy(&ls->ls_recover_idr);
 651        kfree(ls->ls_recover_buf);
 652 out_lkbidr:
 653        idr_destroy(&ls->ls_lkbidr);
 654        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 655                if (ls->ls_remove_names[i])
 656                        kfree(ls->ls_remove_names[i]);
 657        }
 658 out_rsbtbl:
 659        vfree(ls->ls_rsbtbl);
 660 out_lsfree:
 661        if (do_unreg)
 662                kobject_put(&ls->ls_kobj);
 663        else
 664                kfree(ls);
 665 out:
 666        module_put(THIS_MODULE);
 667        return error;
 668}
 669
 670int dlm_new_lockspace(const char *name, const char *cluster,
 671                      uint32_t flags, int lvblen,
 672                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 673                      int *ops_result, dlm_lockspace_t **lockspace)
 674{
 675        int error = 0;
 676
 677        mutex_lock(&ls_lock);
 678        if (!ls_count)
 679                error = threads_start();
 680        if (error)
 681                goto out;
 682
 683        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 684                              ops_result, lockspace);
 685        if (!error)
 686                ls_count++;
 687        if (error > 0)
 688                error = 0;
 689        if (!ls_count)
 690                threads_stop();
 691 out:
 692        mutex_unlock(&ls_lock);
 693        return error;
 694}
 695
 696static int lkb_idr_is_local(int id, void *p, void *data)
 697{
 698        struct dlm_lkb *lkb = p;
 699
 700        if (!lkb->lkb_nodeid)
 701                return 1;
 702        return 0;
 703}
 704
 705static int lkb_idr_is_any(int id, void *p, void *data)
 706{
 707        return 1;
 708}
 709
 710static int lkb_idr_free(int id, void *p, void *data)
 711{
 712        struct dlm_lkb *lkb = p;
 713
 714        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 715                dlm_free_lvb(lkb->lkb_lvbptr);
 716
 717        dlm_free_lkb(lkb);
 718        return 0;
 719}
 720
 721/* NOTE: We check the lkbidr here rather than the resource table.
 722   This is because there may be LKBs queued as ASTs that have been unlinked
 723   from their RSBs and are pending deletion once the AST has been delivered */
 724
 725static int lockspace_busy(struct dlm_ls *ls, int force)
 726{
 727        int rv;
 728
 729        spin_lock(&ls->ls_lkbidr_spin);
 730        if (force == 0) {
 731                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 732        } else if (force == 1) {
 733                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 734        } else {
 735                rv = 0;
 736        }
 737        spin_unlock(&ls->ls_lkbidr_spin);
 738        return rv;
 739}
 740
 741static int release_lockspace(struct dlm_ls *ls, int force)
 742{
 743        struct dlm_rsb *rsb;
 744        struct rb_node *n;
 745        int i, busy, rv;
 746
 747        busy = lockspace_busy(ls, force);
 748
 749        spin_lock(&lslist_lock);
 750        if (ls->ls_create_count == 1) {
 751                if (busy) {
 752                        rv = -EBUSY;
 753                } else {
 754                        /* remove_lockspace takes ls off lslist */
 755                        ls->ls_create_count = 0;
 756                        rv = 0;
 757                }
 758        } else if (ls->ls_create_count > 1) {
 759                rv = --ls->ls_create_count;
 760        } else {
 761                rv = -EINVAL;
 762        }
 763        spin_unlock(&lslist_lock);
 764
 765        if (rv) {
 766                log_debug(ls, "release_lockspace no remove %d", rv);
 767                return rv;
 768        }
 769
 770        dlm_device_deregister(ls);
 771
 772        if (force < 3 && dlm_user_daemon_available())
 773                do_uevent(ls, 0);
 774
 775        dlm_recoverd_stop(ls);
 776
 777        dlm_callback_stop(ls);
 778
 779        remove_lockspace(ls);
 780
 781        dlm_delete_debug_file(ls);
 782
 783        kfree(ls->ls_recover_buf);
 784
 785        /*
 786         * Free all lkb's in idr
 787         */
 788
 789        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 790        idr_remove_all(&ls->ls_lkbidr);
 791        idr_destroy(&ls->ls_lkbidr);
 792
 793        /*
 794         * Free all rsb's on rsbtbl[] lists
 795         */
 796
 797        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 798                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 799                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 800                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 801                        dlm_free_rsb(rsb);
 802                }
 803
 804                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 805                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 806                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 807                        dlm_free_rsb(rsb);
 808                }
 809        }
 810
 811        vfree(ls->ls_rsbtbl);
 812
 813        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 814                kfree(ls->ls_remove_names[i]);
 815
 816        while (!list_empty(&ls->ls_new_rsb)) {
 817                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 818                                       res_hashchain);
 819                list_del(&rsb->res_hashchain);
 820                dlm_free_rsb(rsb);
 821        }
 822
 823        /*
 824         * Free structures on any other lists
 825         */
 826
 827        dlm_purge_requestqueue(ls);
 828        kfree(ls->ls_recover_args);
 829        dlm_clear_members(ls);
 830        dlm_clear_members_gone(ls);
 831        kfree(ls->ls_node_array);
 832        log_debug(ls, "release_lockspace final free");
 833        kobject_put(&ls->ls_kobj);
 834        /* The ls structure will be freed when the kobject is done with */
 835
 836        module_put(THIS_MODULE);
 837        return 0;
 838}
 839
 840/*
 841 * Called when a system has released all its locks and is not going to use the
 842 * lockspace any longer.  We free everything we're managing for this lockspace.
 843 * Remaining nodes will go through the recovery process as if we'd died.  The
 844 * lockspace must continue to function as usual, participating in recoveries,
 845 * until this returns.
 846 *
 847 * Force has 4 possible values:
 848 * 0 - don't destroy locksapce if it has any LKBs
 849 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 850 * 2 - destroy lockspace regardless of LKBs
 851 * 3 - destroy lockspace as part of a forced shutdown
 852 */
 853
 854int dlm_release_lockspace(void *lockspace, int force)
 855{
 856        struct dlm_ls *ls;
 857        int error;
 858
 859        ls = dlm_find_lockspace_local(lockspace);
 860        if (!ls)
 861                return -EINVAL;
 862        dlm_put_lockspace(ls);
 863
 864        mutex_lock(&ls_lock);
 865        error = release_lockspace(ls, force);
 866        if (!error)
 867                ls_count--;
 868        if (!ls_count)
 869                threads_stop();
 870        mutex_unlock(&ls_lock);
 871
 872        return error;
 873}
 874
 875void dlm_stop_lockspaces(void)
 876{
 877        struct dlm_ls *ls;
 878
 879 restart:
 880        spin_lock(&lslist_lock);
 881        list_for_each_entry(ls, &lslist, ls_list) {
 882                if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
 883                        continue;
 884                spin_unlock(&lslist_lock);
 885                log_error(ls, "no userland control daemon, stopping lockspace");
 886                dlm_ls_stop(ls);
 887                goto restart;
 888        }
 889        spin_unlock(&lslist_lock);
 890}
 891
 892
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.