linux/fs/dlm/lock.c History
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13/* Central locking logic has four stages:
  14
  15   dlm_lock()
  16   dlm_unlock()
  17
  18   request_lock(ls, lkb)
  19   convert_lock(ls, lkb)
  20   unlock_lock(ls, lkb)
  21   cancel_lock(ls, lkb)
  22
  23   _request_lock(r, lkb)
  24   _convert_lock(r, lkb)
  25   _unlock_lock(r, lkb)
  26   _cancel_lock(r, lkb)
  27
  28   do_request(r, lkb)
  29   do_convert(r, lkb)
  30   do_unlock(r, lkb)
  31   do_cancel(r, lkb)
  32
  33   Stage 1 (lock, unlock) is mainly about checking input args and
  34   splitting into one of the four main operations:
  35
  36       dlm_lock          = request_lock
  37       dlm_lock+CONVERT  = convert_lock
  38       dlm_unlock        = unlock_lock
  39       dlm_unlock+CANCEL = cancel_lock
  40
  41   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
  42   provided to the next stage.
  43
  44   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
  45   When remote, it calls send_xxxx(), when local it calls do_xxxx().
  46
  47   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
  48   given rsb and lkb and queues callbacks.
  49
  50   For remote operations, send_xxxx() results in the corresponding do_xxxx()
  51   function being executed on the remote node.  The connecting send/receive
  52   calls on local (L) and remote (R) nodes:
  53
  54   L: send_xxxx()              ->  R: receive_xxxx()
  55                                   R: do_xxxx()
  56   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
  57*/
  58#include <linux/types.h>
  59#include "dlm_internal.h"
  60#include <linux/dlm_device.h>
  61#include "memory.h"
  62#include "lowcomms.h"
  63#include "requestqueue.h"
  64#include "util.h"
  65#include "dir.h"
  66#include "member.h"
  67#include "lockspace.h"
  68#include "ast.h"
  69#include "lock.h"
  70#include "rcom.h"
  71#include "recover.h"
  72#include "lvb_table.h"
  73#include "user.h"
  74#include "config.h"
  75
  76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
  77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
  78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
  80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
  81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
  82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
  83static int send_remove(struct dlm_rsb *r);
  84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  85static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  86static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
  87                                    struct dlm_message *ms);
  88static int receive_extralen(struct dlm_message *ms);
  89static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
  90static void del_timeout(struct dlm_lkb *lkb);
  91
  92/*
  93 * Lock compatibilty matrix - thanks Steve
  94 * UN = Unlocked state. Not really a state, used as a flag
  95 * PD = Padding. Used to make the matrix a nice power of two in size
  96 * Other states are the same as the VMS DLM.
  97 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
  98 */
  99
 100static const int __dlm_compat_matrix[8][8] = {
 101      /* UN NL CR CW PR PW EX PD */
 102        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
 103        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
 104        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
 105        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
 106        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
 107        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
 108        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
 109        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 110};
 111
 112/*
 113 * This defines the direction of transfer of LVB data.
 114 * Granted mode is the row; requested mode is the column.
 115 * Usage: matrix[grmode+1][rqmode+1]
 116 * 1 = LVB is returned to the caller
 117 * 0 = LVB is written to the resource
 118 * -1 = nothing happens to the LVB
 119 */
 120
 121const int dlm_lvb_operations[8][8] = {
 122        /* UN   NL  CR  CW  PR  PW  EX  PD*/
 123        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
 124        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
 125        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
 126        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
 127        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
 128        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
 129        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
 130        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
 131};
 132
 133#define modes_compat(gr, rq) \
 134        __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
 135
 136int dlm_modes_compat(int mode1, int mode2)
 137{
 138        return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
 139}
 140
 141/*
 142 * Compatibility matrix for conversions with QUECVT set.
 143 * Granted mode is the row; requested mode is the column.
 144 * Usage: matrix[grmode+1][rqmode+1]
 145 */
 146
 147static const int __quecvt_compat_matrix[8][8] = {
 148      /* UN NL CR CW PR PW EX PD */
 149        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
 150        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
 151        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
 152        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
 153        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
 154        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
 155        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
 156        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 157};
 158
 159void dlm_print_lkb(struct dlm_lkb *lkb)
 160{
 161        printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
 162               "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
 163               lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
 164               lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
 165               lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
 166}
 167
 168static void dlm_print_rsb(struct dlm_rsb *r)
 169{
 170        printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
 171               r->res_nodeid, r->res_flags, r->res_first_lkid,
 172               r->res_recover_locks_count, r->res_name);
 173}
 174
 175void dlm_dump_rsb(struct dlm_rsb *r)
 176{
 177        struct dlm_lkb *lkb;
 178
 179        dlm_print_rsb(r);
 180
 181        printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
 182               list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
 183        printk(KERN_ERR "rsb lookup list\n");
 184        list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
 185                dlm_print_lkb(lkb);
 186        printk(KERN_ERR "rsb grant queue:\n");
 187        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
 188                dlm_print_lkb(lkb);
 189        printk(KERN_ERR "rsb convert queue:\n");
 190        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
 191                dlm_print_lkb(lkb);
 192        printk(KERN_ERR "rsb wait queue:\n");
 193        list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
 194                dlm_print_lkb(lkb);
 195}
 196
 197/* Threads cannot use the lockspace while it's being recovered */
 198
 199static inline void dlm_lock_recovery(struct dlm_ls *ls)
 200{
 201        down_read(&ls->ls_in_recovery);
 202}
 203
 204void dlm_unlock_recovery(struct dlm_ls *ls)
 205{
 206        up_read(&ls->ls_in_recovery);
 207}
 208
 209int dlm_lock_recovery_try(struct dlm_ls *ls)
 210{
 211        return down_read_trylock(&ls->ls_in_recovery);
 212}
 213
 214static inline int can_be_queued(struct dlm_lkb *lkb)
 215{
 216        return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
 217}
 218
 219static inline int force_blocking_asts(struct dlm_lkb *lkb)
 220{
 221        return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
 222}
 223
 224static inline int is_demoted(struct dlm_lkb *lkb)
 225{
 226        return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
 227}
 228
 229static inline int is_altmode(struct dlm_lkb *lkb)
 230{
 231        return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
 232}
 233
 234static inline int is_granted(struct dlm_lkb *lkb)
 235{
 236        return (lkb->lkb_status == DLM_LKSTS_GRANTED);
 237}
 238
 239static inline int is_remote(struct dlm_rsb *r)
 240{
 241        DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
 242        return !!r->res_nodeid;
 243}
 244
 245static inline int is_process_copy(struct dlm_lkb *lkb)
 246{
 247        return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
 248}
 249
 250static inline int is_master_copy(struct dlm_lkb *lkb)
 251{
 252        if (lkb->lkb_flags & DLM_IFL_MSTCPY)
 253                DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
 254        return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 255}
 256
 257static inline int middle_conversion(struct dlm_lkb *lkb)
 258{
 259        if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
 260            (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
 261                return 1;
 262        return 0;
 263}
 264
 265static inline int down_conversion(struct dlm_lkb *lkb)
 266{
 267        return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
 268}
 269
 270static inline int is_overlap_unlock(struct dlm_lkb *lkb)
 271{
 272        return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
 273}
 274
 275static inline int is_overlap_cancel(struct dlm_lkb *lkb)
 276{
 277        return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
 278}
 279
 280static inline int is_overlap(struct dlm_lkb *lkb)
 281{
 282        return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
 283                                  DLM_IFL_OVERLAP_CANCEL));
 284}
 285
 286static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
 287{
 288        if (is_master_copy(lkb))
 289                return;
 290
 291        del_timeout(lkb);
 292
 293        DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
 294
 295        /* if the operation was a cancel, then return -DLM_ECANCEL, if a
 296           timeout caused the cancel then return -ETIMEDOUT */
 297        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
 298                lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
 299                rv = -ETIMEDOUT;
 300        }
 301
 302        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
 303                lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
 304                rv = -EDEADLK;
 305        }
 306
 307        lkb->lkb_lksb->sb_status = rv;
 308        lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
 309
 310        dlm_add_ast(lkb, AST_COMP, 0);
 311}
 312
 313static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
 314{
 315        queue_cast(r, lkb,
 316                   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
 317}
 318
 319static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 320{
 321        lkb->lkb_time_bast = ktime_get();
 322
 323        if (is_master_copy(lkb))
 324                send_bast(r, lkb, rqmode);
 325        else
 326                dlm_add_ast(lkb, AST_BAST, rqmode);
 327}
 328
 329/*
 330 * Basic operations on rsb's and lkb's
 331 */
 332
 333static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
 334{
 335        struct dlm_rsb *r;
 336
 337        r = dlm_allocate_rsb(ls, len);
 338        if (!r)
 339                return NULL;
 340
 341        r->res_ls = ls;
 342        r->res_length = len;
 343        memcpy(r->res_name, name, len);
 344        mutex_init(&r->res_mutex);
 345
 346        INIT_LIST_HEAD(&r->res_lookup);
 347        INIT_LIST_HEAD(&r->res_grantqueue);
 348        INIT_LIST_HEAD(&r->res_convertqueue);
 349        INIT_LIST_HEAD(&r->res_waitqueue);
 350        INIT_LIST_HEAD(&r->res_root_list);
 351        INIT_LIST_HEAD(&r->res_recover_list);
 352
 353        return r;
 354}
 355
 356static int search_rsb_list(struct list_head *head, char *name, int len,
 357                           unsigned int flags, struct dlm_rsb **r_ret)
 358{
 359        struct dlm_rsb *r;
 360        int error = 0;
 361
 362        list_for_each_entry(r, head, res_hashchain) {
 363                if (len == r->res_length && !memcmp(name, r->res_name, len))
 364                        goto found;
 365        }
 366        *r_ret = NULL;
 367        return -EBADR;
 368
 369 found:
 370        if (r->res_nodeid && (flags & R_MASTER))
 371                error = -ENOTBLK;
 372        *r_ret = r;
 373        return error;
 374}
 375
 376static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 377                       unsigned int flags, struct dlm_rsb **r_ret)
 378{
 379        struct dlm_rsb *r;
 380        int error;
 381
 382        error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
 383        if (!error) {
 384                kref_get(&r->res_ref);
 385                goto out;
 386        }
 387        error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 388        if (error)
 389                goto out;
 390
 391        list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
 392
 393        if (dlm_no_directory(ls))
 394                goto out;
 395
 396        if (r->res_nodeid == -1) {
 397                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 398                r->res_first_lkid = 0;
 399        } else if (r->res_nodeid > 0) {
 400                rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
 401                r->res_first_lkid = 0;
 402        } else {
 403                DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
 404                DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
 405        }
 406 out:
 407        *r_ret = r;
 408        return error;
 409}
 410
 411static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 412                      unsigned int flags, struct dlm_rsb **r_ret)
 413{
 414        int error;
 415        spin_lock(&ls->ls_rsbtbl[b].lock);
 416        error = _search_rsb(ls, name, len, b, flags, r_ret);
 417        spin_unlock(&ls->ls_rsbtbl[b].lock);
 418        return error;
 419}
 420
 421/*
 422 * Find rsb in rsbtbl and potentially create/add one
 423 *
 424 * Delaying the release of rsb's has a similar benefit to applications keeping
 425 * NL locks on an rsb, but without the guarantee that the cached master value
 426 * will still be valid when the rsb is reused.  Apps aren't always smart enough
 427 * to keep NL locks on an rsb that they may lock again shortly; this can lead
 428 * to excessive master lookups and removals if we don't delay the release.
 429 *
 430 * Searching for an rsb means looking through both the normal list and toss
 431 * list.  When found on the toss list the rsb is moved to the normal list with
 432 * ref count of 1; when found on normal list the ref count is incremented.
 433 */
 434
 435static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 436                    unsigned int flags, struct dlm_rsb **r_ret)
 437{
 438        struct dlm_rsb *r = NULL, *tmp;
 439        uint32_t hash, bucket;
 440        int error = -EINVAL;
 441
 442        if (namelen > DLM_RESNAME_MAXLEN)
 443                goto out;
 444
 445        if (dlm_no_directory(ls))
 446                flags |= R_CREATE;
 447
 448        error = 0;
 449        hash = jhash(name, namelen, 0);
 450        bucket = hash & (ls->ls_rsbtbl_size - 1);
 451
 452        error = search_rsb(ls, name, namelen, bucket, flags, &r);
 453        if (!error)
 454                goto out;
 455
 456        if (error == -EBADR && !(flags & R_CREATE))
 457                goto out;
 458
 459        /* the rsb was found but wasn't a master copy */
 460        if (error == -ENOTBLK)
 461                goto out;
 462
 463        error = -ENOMEM;
 464        r = create_rsb(ls, name, namelen);
 465        if (!r)
 466                goto out;
 467
 468        r->res_hash = hash;
 469        r->res_bucket = bucket;
 470        r->res_nodeid = -1;
 471        kref_init(&r->res_ref);
 472
 473        /* With no directory, the master can be set immediately */
 474        if (dlm_no_directory(ls)) {
 475                int nodeid = dlm_dir_nodeid(r);
 476                if (nodeid == dlm_our_nodeid())
 477                        nodeid = 0;
 478                r->res_nodeid = nodeid;
 479        }
 480
 481        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 482        error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
 483        if (!error) {
 484                spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 485                dlm_free_rsb(r);
 486                r = tmp;
 487                goto out;
 488        }
 489        list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
 490        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 491        error = 0;
 492 out:
 493        *r_ret = r;
 494        return error;
 495}
 496
 497/* This is only called to add a reference when the code already holds
 498   a valid reference to the rsb, so there's no need for locking. */
 499
 500static inline void hold_rsb(struct dlm_rsb *r)
 501{
 502        kref_get(&r->res_ref);
 503}
 504
 505void dlm_hold_rsb(struct dlm_rsb *r)
 506{
 507        hold_rsb(r);
 508}
 509
 510static void toss_rsb(struct kref *kref)
 511{
 512        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 513        struct dlm_ls *ls = r->res_ls;
 514
 515        DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 516        kref_init(&r->res_ref);
 517        list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
 518        r->res_toss_time = jiffies;
 519        if (r->res_lvbptr) {
 520                dlm_free_lvb(r->res_lvbptr);
 521                r->res_lvbptr = NULL;
 522        }
 523}
 524
 525/* When all references to the rsb are gone it's transfered to
 526   the tossed list for later disposal. */
 527
 528static void put_rsb(struct dlm_rsb *r)
 529{
 530        struct dlm_ls *ls = r->res_ls;
 531        uint32_t bucket = r->res_bucket;
 532
 533        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 534        kref_put(&r->res_ref, toss_rsb);
 535        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 536}
 537
 538void dlm_put_rsb(struct dlm_rsb *r)
 539{
 540        put_rsb(r);
 541}
 542
 543/* See comment for unhold_lkb */
 544
 545static void unhold_rsb(struct dlm_rsb *r)
 546{
 547        int rv;
 548        rv = kref_put(&r->res_ref, toss_rsb);
 549        DLM_ASSERT(!rv, dlm_dump_rsb(r););
 550}
 551
 552static void kill_rsb(struct kref *kref)
 553{
 554        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 555
 556        /* All work is done after the return from kref_put() so we
 557           can release the write_lock before the remove and free. */
 558
 559        DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
 560        DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
 561        DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
 562        DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
 563        DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
 564        DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
 565}
 566
 567/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
 568   The rsb must exist as long as any lkb's for it do. */
 569
 570static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 571{
 572        hold_rsb(r);
 573        lkb->lkb_resource = r;
 574}
 575
 576static void detach_lkb(struct dlm_lkb *lkb)
 577{
 578        if (lkb->lkb_resource) {
 579                put_rsb(lkb->lkb_resource);
 580                lkb->lkb_resource = NULL;
 581        }
 582}
 583
 584static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
 585{
 586        struct dlm_lkb *lkb, *tmp;
 587        uint32_t lkid = 0;
 588        uint16_t bucket;
 589
 590        lkb = dlm_allocate_lkb(ls);
 591        if (!lkb)
 592                return -ENOMEM;
 593
 594        lkb->lkb_nodeid = -1;
 595        lkb->lkb_grmode = DLM_LOCK_IV;
 596        kref_init(&lkb->lkb_ref);
 597        INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 598        INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 599        INIT_LIST_HEAD(&lkb->lkb_time_list);
 600
 601        get_random_bytes(&bucket, sizeof(bucket));
 602        bucket &= (ls->ls_lkbtbl_size - 1);
 603
 604        write_lock(&ls->ls_lkbtbl[bucket].lock);
 605
 606        /* counter can roll over so we must verify lkid is not in use */
 607
 608        while (lkid == 0) {
 609                lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
 610
 611                list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
 612                                    lkb_idtbl_list) {
 613                        if (tmp->lkb_id != lkid)
 614                                continue;
 615                        lkid = 0;
 616                        break;
 617                }
 618        }
 619
 620        lkb->lkb_id = lkid;
 621        list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
 622        write_unlock(&ls->ls_lkbtbl[bucket].lock);
 623
 624        *lkb_ret = lkb;
 625        return 0;
 626}
 627
 628static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
 629{
 630        struct dlm_lkb *lkb;
 631        uint16_t bucket = (lkid >> 16);
 632
 633        list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
 634                if (lkb->lkb_id == lkid)
 635                        return lkb;
 636        }
 637        return NULL;
 638}
 639
 640static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 641{
 642        struct dlm_lkb *lkb;
 643        uint16_t bucket = (lkid >> 16);
 644
 645        if (bucket >= ls->ls_lkbtbl_size)
 646                return -EBADSLT;
 647
 648        read_lock(&ls->ls_lkbtbl[bucket].lock);
 649        lkb = __find_lkb(ls, lkid);
 650        if (lkb)
 651                kref_get(&lkb->lkb_ref);
 652        read_unlock(&ls->ls_lkbtbl[bucket].lock);
 653
 654        *lkb_ret = lkb;
 655        return lkb ? 0 : -ENOENT;
 656}
 657
 658static void kill_lkb(struct kref *kref)
 659{
 660        struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
 661
 662        /* All work is done after the return from kref_put() so we
 663           can release the write_lock before the detach_lkb */
 664
 665        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 666}
 667
 668/* __put_lkb() is used when an lkb may not have an rsb attached to
 669   it so we need to provide the lockspace explicitly */
 670
 671static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 672{
 673        uint16_t bucket = (lkb->lkb_id >> 16);
 674
 675        write_lock(&ls->ls_lkbtbl[bucket].lock);
 676        if (kref_put(&lkb->lkb_ref, kill_lkb)) {
 677                list_del(&lkb->lkb_idtbl_list);
 678                write_unlock(&ls->ls_lkbtbl[bucket].lock);
 679
 680                detach_lkb(lkb);
 681
 682                /* for local/process lkbs, lvbptr points to caller's lksb */
 683                if (lkb->lkb_lvbptr && is_master_copy(lkb))
 684                        dlm_free_lvb(lkb->lkb_lvbptr);
 685                dlm_free_lkb(lkb);
 686                return 1;
 687        } else {
 688                write_unlock(&ls->ls_lkbtbl[bucket].lock);
 689                return 0;
 690        }
 691}
 692
 693int dlm_put_lkb(struct dlm_lkb *lkb)
 694{
 695        struct dlm_ls *ls;
 696
 697        DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
 698        DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
 699
 700        ls = lkb->lkb_resource->res_ls;
 701        return __put_lkb(ls, lkb);
 702}
 703
 704/* This is only called to add a reference when the code already holds
 705   a valid reference to the lkb, so there's no need for locking. */
 706
 707static inline void hold_lkb(struct dlm_lkb *lkb)
 708{
 709        kref_get(&lkb->lkb_ref);
 710}
 711
 712/* This is called when we need to remove a reference and are certain
 713   it's not the last ref.  e.g. del_lkb is always called between a
 714   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
 715   put_lkb would work fine, but would involve unnecessary locking */
 716
 717static inline void unhold_lkb(struct dlm_lkb *lkb)
 718{
 719        int rv;
 720        rv = kref_put(&lkb->lkb_ref, kill_lkb);
 721        DLM_ASSERT(!rv, dlm_print_lkb(lkb););
 722}
 723
 724static void lkb_add_ordered(struct list_head *new, struct list_head *head,
 725                            int mode)
 726{
 727        struct dlm_lkb *lkb = NULL;
 728
 729        list_for_each_entry(lkb, head, lkb_statequeue)
 730                if (lkb->lkb_rqmode < mode)
 731                        break;
 732
 733        if (!lkb)
 734                list_add_tail(new, head);
 735        else
 736                __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
 737}
 738
 739/* add/remove lkb to rsb's grant/convert/wait queue */
 740
 741static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
 742{
 743        kref_get(&lkb->lkb_ref);
 744
 745        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 746
 747        lkb->lkb_timestamp = ktime_get();
 748
 749        lkb->lkb_status = status;
 750
 751        switch (status) {
 752        case DLM_LKSTS_WAITING:
 753                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 754                        list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
 755                else
 756                        list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
 757                break;
 758        case DLM_LKSTS_GRANTED:
 759                /* convention says granted locks kept in order of grmode */
 760                lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
 761                                lkb->lkb_grmode);
 762                break;
 763        case DLM_LKSTS_CONVERT:
 764                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 765                        list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
 766                else
 767                        list_add_tail(&lkb->lkb_statequeue,
 768                                      &r->res_convertqueue);
 769                break;
 770        default:
 771                DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
 772        }
 773}
 774
 775static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 776{
 777        lkb->lkb_status = 0;
 778        list_del(&lkb->lkb_statequeue);
 779        unhold_lkb(lkb);
 780}
 781
 782static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
 783{
 784        hold_lkb(lkb);
 785        del_lkb(r, lkb);
 786        add_lkb(r, lkb, sts);
 787        unhold_lkb(lkb);
 788}
 789
 790static int msg_reply_type(int mstype)
 791{
 792        switch (mstype) {
 793        case DLM_MSG_REQUEST:
 794                return DLM_MSG_REQUEST_REPLY;
 795        case DLM_MSG_CONVERT:
 796                return DLM_MSG_CONVERT_REPLY;
 797        case DLM_MSG_UNLOCK:
 798                return DLM_MSG_UNLOCK_REPLY;
 799        case DLM_MSG_CANCEL:
 800                return DLM_MSG_CANCEL_REPLY;
 801        case DLM_MSG_LOOKUP:
 802                return DLM_MSG_LOOKUP_REPLY;
 803        }
 804        return -1;
 805}
 806
 807/* add/remove lkb from global waiters list of lkb's waiting for
 808   a reply from a remote node */
 809
 810static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
 811{
 812        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 813        int error = 0;
 814
 815        mutex_lock(&ls->ls_waiters_mutex);
 816
 817        if (is_overlap_unlock(lkb) ||
 818            (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
 819                error = -EINVAL;
 820                goto out;
 821        }
 822
 823        if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
 824                switch (mstype) {
 825                case DLM_MSG_UNLOCK:
 826                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
 827                        break;
 828                case DLM_MSG_CANCEL:
 829                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
 830                        break;
 831                default:
 832                        error = -EBUSY;
 833                        goto out;
 834                }
 835                lkb->lkb_wait_count++;
 836                hold_lkb(lkb);
 837
 838                log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
 839                          lkb->lkb_id, lkb->lkb_wait_type, mstype,
 840                          lkb->lkb_wait_count, lkb->lkb_flags);
 841                goto out;
 842        }
 843
 844        DLM_ASSERT(!lkb->lkb_wait_count,
 845                   dlm_print_lkb(lkb);
 846                   printk("wait_count %d\n", lkb->lkb_wait_count););
 847
 848        lkb->lkb_wait_count++;
 849        lkb->lkb_wait_type = mstype;
 850        hold_lkb(lkb);
 851        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
 852 out:
 853        if (error)
 854                log_error(ls, "addwait error %x %d flags %x %d %d %s",
 855                          lkb->lkb_id, error, lkb->lkb_flags, mstype,
 856                          lkb->lkb_wait_type, lkb->lkb_resource->res_name);
 857        mutex_unlock(&ls->ls_waiters_mutex);
 858        return error;
 859}
 860
 861/* We clear the RESEND flag because we might be taking an lkb off the waiters
 862   list as part of process_requestqueue (e.g. a lookup that has an optimized
 863   request reply on the requestqueue) between dlm_recover_waiters_pre() which
 864   set RESEND and dlm_recover_waiters_post() */
 865
 866static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
 867                                struct dlm_message *ms)
 868{
 869        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 870        int overlap_done = 0;
 871
 872        if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
 873                log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
 874                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
 875                overlap_done = 1;
 876                goto out_del;
 877        }
 878
 879        if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
 880                log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
 881                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 882                overlap_done = 1;
 883                goto out_del;
 884        }
 885
 886        /* Cancel state was preemptively cleared by a successful convert,
 887           see next comment, nothing to do. */
 888
 889        if ((mstype == DLM_MSG_CANCEL_REPLY) &&
 890            (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
 891                log_debug(ls, "remwait %x cancel_reply wait_type %d",
 892                          lkb->lkb_id, lkb->lkb_wait_type);
 893                return -1;
 894        }
 895
 896        /* Remove for the convert reply, and premptively remove for the
 897           cancel reply.  A convert has been granted while there's still
 898           an outstanding cancel on it (the cancel is moot and the result
 899           in the cancel reply should be 0).  We preempt the cancel reply
 900           because the app gets the convert result and then can follow up
 901           with another op, like convert.  This subsequent op would see the
 902           lingering state of the cancel and fail with -EBUSY. */
 903
 904        if ((mstype == DLM_MSG_CONVERT_REPLY) &&
 905            (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
 906            is_overlap_cancel(lkb) && ms && !ms->m_result) {
 907                log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
 908                          lkb->lkb_id);
 909                lkb->lkb_wait_type = 0;
 910                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 911                lkb->lkb_wait_count--;
 912                goto out_del;
 913        }
 914
 915        /* N.B. type of reply may not always correspond to type of original
 916           msg due to lookup->request optimization, verify others? */
 917
 918        if (lkb->lkb_wait_type) {
 919                lkb->lkb_wait_type = 0;
 920                goto out_del;
 921        }
 922
 923        log_error(ls, "remwait error %x reply %d flags %x no wait_type",
 924                  lkb->lkb_id, mstype, lkb->lkb_flags);
 925        return -1;
 926
 927 out_del:
 928        /* the force-unlock/cancel has completed and we haven't recvd a reply
 929           to the op that was in progress prior to the unlock/cancel; we
 930           give up on any reply to the earlier op.  FIXME: not sure when/how
 931           this would happen */
 932
 933        if (overlap_done && lkb->lkb_wait_type) {
 934                log_error(ls, "remwait error %x reply %d wait_type %d overlap",
 935                          lkb->lkb_id, mstype, lkb->lkb_wait_type);
 936                lkb->lkb_wait_count--;
 937                lkb->lkb_wait_type = 0;
 938        }
 939
 940        DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
 941
 942        lkb->lkb_flags &= ~DLM_IFL_RESEND;
 943        lkb->lkb_wait_count--;
 944        if (!lkb->lkb_wait_count)
 945                list_del_init(&lkb->lkb_wait_reply);
 946        unhold_lkb(lkb);
 947        return 0;
 948}
 949
 950static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 951{
 952        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 953        int error;
 954
 955        mutex_lock(&ls->ls_waiters_mutex);
 956        error = _remove_from_waiters(lkb, mstype, NULL);
 957        mutex_unlock(&ls->ls_waiters_mutex);
 958        return error;
 959}
 960
 961/* Handles situations where we might be processing a "fake" or "stub" reply in
 962   which we can't try to take waiters_mutex again. */
 963
 964static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 965{
 966        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 967        int error;
 968
 969        if (ms != &ls->ls_stub_ms)
 970                mutex_lock(&ls->ls_waiters_mutex);
 971        error = _remove_from_waiters(lkb, ms->m_type, ms);
 972        if (ms != &ls->ls_stub_ms)
 973                mutex_unlock(&ls->ls_waiters_mutex);
 974        return error;
 975}
 976
 977static void dir_remove(struct dlm_rsb *r)
 978{
 979        int to_nodeid;
 980
 981        if (dlm_no_directory(r->res_ls))
 982                return;
 983
 984        to_nodeid = dlm_dir_nodeid(r);
 985        if (to_nodeid != dlm_our_nodeid())
 986                send_remove(r);
 987        else
 988                dlm_dir_remove_entry(r->res_ls, to_nodeid,
 989                                     r->res_name, r->res_length);
 990}
 991
 992/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
 993   found since they are in order of newest to oldest? */
 994
 995static int shrink_bucket(struct dlm_ls *ls, int b)
 996{
 997        struct dlm_rsb *r;
 998        int count = 0, found;
 999
1000        for (;;) {
1001                found = 0;
1002                spin_lock(&ls->ls_rsbtbl[b].lock);
1003                list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004                                            res_hashchain) {
1005                        if (!time_after_eq(jiffies, r->res_toss_time +
1006                                           dlm_config.ci_toss_secs * HZ))
1007                                continue;
1008                        found = 1;
1009                        break;
1010                }
1011
1012                if (!found) {
1013                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1014                        break;
1015                }
1016
1017                if (kref_put(&r->res_ref, kill_rsb)) {
1018                        list_del(&r->res_hashchain);
1019                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1020
1021                        if (is_master(r))
1022                                dir_remove(r);
1023                        dlm_free_rsb(r);
1024                        count++;
1025                } else {
1026                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1027                        log_error(ls, "tossed rsb in use %s", r->res_name);
1028                }
1029        }
1030
1031        return count;
1032}
1033
1034void dlm_scan_rsbs(struct dlm_ls *ls)
1035{
1036        int i;
1037
1038        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039                shrink_bucket(ls, i);
1040                if (dlm_locking_stopped(ls))
1041                        break;
1042                cond_resched();
1043        }
1044}
1045
1046static void add_timeout(struct dlm_lkb *lkb)
1047{
1048        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049
1050        if (is_master_copy(lkb))
1051                return;
1052
1053        if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054            !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055                lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056                goto add_it;
1057        }
1058        if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059                goto add_it;
1060        return;
1061
1062 add_it:
1063        DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064        mutex_lock(&ls->ls_timeout_mutex);
1065        hold_lkb(lkb);
1066        list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067        mutex_unlock(&ls->ls_timeout_mutex);
1068}
1069
1070static void del_timeout(struct dlm_lkb *lkb)
1071{
1072        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073
1074        mutex_lock(&ls->ls_timeout_mutex);
1075        if (!list_empty(&lkb->lkb_time_list)) {
1076                list_del_init(&lkb->lkb_time_list);
1077                unhold_lkb(lkb);
1078        }
1079        mutex_unlock(&ls->ls_timeout_mutex);
1080}
1081
1082/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083   lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084   and then lock rsb because of lock ordering in add_timeout.  We may need
1085   to specify some special timeout-related bits in the lkb that are just to
1086   be accessed under the timeout_mutex. */
1087
1088void dlm_scan_timeout(struct dlm_ls *ls)
1089{
1090        struct dlm_rsb *r;
1091        struct dlm_lkb *lkb;
1092        int do_cancel, do_warn;
1093        s64 wait_us;
1094
1095        for (;;) {
1096                if (dlm_locking_stopped(ls))
1097                        break;
1098
1099                do_cancel = 0;
1100                do_warn = 0;
1101                mutex_lock(&ls->ls_timeout_mutex);
1102                list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103
1104                        wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105                                                        lkb->lkb_timestamp));
1106
1107                        if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108                            wait_us >= (lkb->lkb_timeout_cs * 10000))
1109                                do_cancel = 1;
1110
1111                        if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112                            wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113                                do_warn = 1;
1114
1115                        if (!do_cancel && !do_warn)
1116                                continue;
1117                        hold_lkb(lkb);
1118                        break;
1119                }
1120                mutex_unlock(&ls->ls_timeout_mutex);
1121
1122                if (!do_cancel && !do_warn)
1123                        break;
1124
1125                r = lkb->lkb_resource;
1126                hold_rsb(r);
1127                lock_rsb(r);
1128
1129                if (do_warn) {
1130                        /* clear flag so we only warn once */
1131                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132                        if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133                                del_timeout(lkb);
1134                        dlm_timeout_warn(lkb);
1135                }
1136
1137                if (do_cancel) {
1138                        log_debug(ls, "timeout cancel %x node %d %s",
1139                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141                        lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142                        del_timeout(lkb);
1143                        _cancel_lock(r, lkb);
1144                }
1145
1146                unlock_rsb(r);
1147                unhold_rsb(r);
1148                dlm_put_lkb(lkb);
1149        }
1150}
1151
1152/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153   dlm_recoverd before checking/setting ls_recover_begin. */
1154
1155void dlm_adjust_timeouts(struct dlm_ls *ls)
1156{
1157        struct dlm_lkb *lkb;
1158        u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159
1160        ls->ls_recover_begin = 0;
1161        mutex_lock(&ls->ls_timeout_mutex);
1162        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163                lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164        mutex_unlock(&ls->ls_timeout_mutex);
1165}
1166
1167/* lkb is master or local copy */
1168
1169static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170{
1171        int b, len = r->res_ls->ls_lvblen;
1172
1173        /* b=1 lvb returned to caller
1174           b=0 lvb written to rsb or invalidated
1175           b=-1 do nothing */
1176
1177        b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178
1179        if (b == 1) {
1180                if (!lkb->lkb_lvbptr)
1181                        return;
1182
1183                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184                        return;
1185
1186                if (!r->res_lvbptr)
1187                        return;
1188
1189                memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190                lkb->lkb_lvbseq = r->res_lvbseq;
1191
1192        } else if (b == 0) {
1193                if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                        rsb_set_flag(r, RSB_VALNOTVALID);
1195                        return;
1196                }
1197
1198                if (!lkb->lkb_lvbptr)
1199                        return;
1200
1201                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                        return;
1203
1204                if (!r->res_lvbptr)
1205                        r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206
1207                if (!r->res_lvbptr)
1208                        return;
1209
1210                memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211                r->res_lvbseq++;
1212                lkb->lkb_lvbseq = r->res_lvbseq;
1213                rsb_clear_flag(r, RSB_VALNOTVALID);
1214        }
1215
1216        if (rsb_flag(r, RSB_VALNOTVALID))
1217                lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218}
1219
1220static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221{
1222        if (lkb->lkb_grmode < DLM_LOCK_PW)
1223                return;
1224
1225        if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226                rsb_set_flag(r, RSB_VALNOTVALID);
1227                return;
1228        }
1229
1230        if (!lkb->lkb_lvbptr)
1231                return;
1232
1233        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234                return;
1235
1236        if (!r->res_lvbptr)
1237                r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238
1239        if (!r->res_lvbptr)
1240                return;
1241
1242        memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243        r->res_lvbseq++;
1244        rsb_clear_flag(r, RSB_VALNOTVALID);
1245}
1246
1247/* lkb is process copy (pc) */
1248
1249static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250                            struct dlm_message *ms)
1251{
1252        int b;
1253
1254        if (!lkb->lkb_lvbptr)
1255                return;
1256
1257        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258                return;
1259
1260        b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261        if (b == 1) {
1262                int len = receive_extralen(ms);
1263                if (len > DLM_RESNAME_MAXLEN)
1264                        len = DLM_RESNAME_MAXLEN;
1265                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266                lkb->lkb_lvbseq = ms->m_lvbseq;
1267        }
1268}
1269
1270/* Manipulate lkb's on rsb's convert/granted/waiting queues
1271   remove_lock -- used for unlock, removes lkb from granted
1272   revert_lock -- used for cancel, moves lkb from convert to granted
1273   grant_lock  -- used for request and convert, adds lkb to granted or
1274                  moves lkb from convert or waiting to granted
1275
1276   Each of these is used for master or local copy lkb's.  There is
1277   also a _pc() variation used to make the corresponding change on
1278   a process copy (pc) lkb. */
1279
1280static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281{
1282        del_lkb(r, lkb);
1283        lkb->lkb_grmode = DLM_LOCK_IV;
1284        /* this unhold undoes the original ref from create_lkb()
1285           so this leads to the lkb being freed */
1286        unhold_lkb(lkb);
1287}
1288
1289static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290{
1291        set_lvb_unlock(r, lkb);
1292        _remove_lock(r, lkb);
1293}
1294
1295static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296{
1297        _remove_lock(r, lkb);
1298}
1299
1300/* returns: 0 did nothing
1301            1 moved lock to granted
1302           -1 removed lock */
1303
1304static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305{
1306        int rv = 0;
1307
1308        lkb->lkb_rqmode = DLM_LOCK_IV;
1309
1310        switch (lkb->lkb_status) {
1311        case DLM_LKSTS_GRANTED:
1312                break;
1313        case DLM_LKSTS_CONVERT:
1314                move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315                rv = 1;
1316                break;
1317        case DLM_LKSTS_WAITING:
1318                del_lkb(r, lkb);
1319                lkb->lkb_grmode = DLM_LOCK_IV;
1320                /* this unhold undoes the original ref from create_lkb()
1321                   so this leads to the lkb being freed */
1322                unhold_lkb(lkb);
1323                rv = -1;
1324                break;
1325        default:
1326                log_print("invalid status for revert %d", lkb->lkb_status);
1327        }
1328        return rv;
1329}
1330
1331static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332{
1333        return revert_lock(r, lkb);
1334}
1335
1336static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337{
1338        if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339                lkb->lkb_grmode = lkb->lkb_rqmode;
1340                if (lkb->lkb_status)
1341                        move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342                else
1343                        add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344        }
1345
1346        lkb->lkb_rqmode = DLM_LOCK_IV;
1347}
1348
1349static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350{
1351        set_lvb_lock(r, lkb);
1352        _grant_lock(r, lkb);
1353        lkb->lkb_highbast = 0;
1354}
1355
1356static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357                          struct dlm_message *ms)
1358{
1359        set_lvb_lock_pc(r, lkb, ms);
1360        _grant_lock(r, lkb);
1361}
1362
1363/* called by grant_pending_locks() which means an async grant message must
1364   be sent to the requesting node in addition to granting the lock if the
1365   lkb belongs to a remote node. */
1366
1367static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368{
1369        grant_lock(r, lkb);
1370        if (is_master_copy(lkb))
1371                send_grant(r, lkb);
1372        else
1373                queue_cast(r, lkb, 0);
1374}
1375
1376/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377   change the granted/requested modes.  We're munging things accordingly in
1378   the process copy.
1379   CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380   conversion deadlock
1381   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382   compatible with other granted locks */
1383
1384static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385{
1386        if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387                log_print("munge_demoted %x invalid reply type %d",
1388                          lkb->lkb_id, ms->m_type);
1389                return;
1390        }
1391
1392        if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393                log_print("munge_demoted %x invalid modes gr %d rq %d",
1394                          lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395                return;
1396        }
1397
1398        lkb->lkb_grmode = DLM_LOCK_NL;
1399}
1400
1401static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402{
1403        if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404            ms->m_type != DLM_MSG_GRANT) {
1405                log_print("munge_altmode %x invalid reply type %d",
1406                          lkb->lkb_id, ms->m_type);
1407                return;
1408        }
1409
1410        if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411                lkb->lkb_rqmode = DLM_LOCK_PR;
1412        else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413                lkb->lkb_rqmode = DLM_LOCK_CW;
1414        else {
1415                log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416                dlm_print_lkb(lkb);
1417        }
1418}
1419
1420static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421{
1422        struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423                                           lkb_statequeue);
1424        if (lkb->lkb_id == first->lkb_id)
1425                return 1;
1426
1427        return 0;
1428}
1429
1430/* Check if the given lkb conflicts with another lkb on the queue. */
1431
1432static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433{
1434        struct dlm_lkb *this;
1435
1436        list_for_each_entry(this, head, lkb_statequeue) {
1437                if (this == lkb)
1438                        continue;
1439                if (!modes_compat(this, lkb))
1440                        return 1;
1441        }
1442        return 0;
1443}
1444
1445/*
1446 * "A conversion deadlock arises with a pair of lock requests in the converting
1447 * queue for one resource.  The granted mode of each lock blocks the requested
1448 * mode of the other lock."
1449 *
1450 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451 * convert queue from being granted, then deadlk/demote lkb.
1452 *
1453 * Example:
1454 * Granted Queue: empty
1455 * Convert Queue: NL->EX (first lock)
1456 *                PR->EX (second lock)
1457 *
1458 * The first lock can't be granted because of the granted mode of the second
1459 * lock and the second lock can't be granted because it's not first in the
1460 * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462 * flag set and return DEMOTED in the lksb flags.
1463 *
1464 * Originally, this function detected conv-deadlk in a more limited scope:
1465 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466 * - if lkb1 was the first entry in the queue (not just earlier), and was
1467 *   blocked by the granted mode of lkb2, and there was nothing on the
1468 *   granted queue preventing lkb1 from being granted immediately, i.e.
1469 *   lkb2 was the only thing preventing lkb1 from being granted.
1470 *
1471 * That second condition meant we'd only say there was conv-deadlk if
1472 * resolving it (by demotion) would lead to the first lock on the convert
1473 * queue being granted right away.  It allowed conversion deadlocks to exist
1474 * between locks on the convert queue while they couldn't be granted anyway.
1475 *
1476 * Now, we detect and take action on conversion deadlocks immediately when
1477 * they're created, even if they may not be immediately consequential.  If
1478 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479 * mode that would prevent lkb1's conversion from being granted, we do a
1480 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481 * I think this means that the lkb_is_ahead condition below should always
1482 * be zero, i.e. there will never be conv-deadlk between two locks that are
1483 * both already on the convert queue.
1484 */
1485
1486static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487{
1488        struct dlm_lkb *lkb1;
1489        int lkb_is_ahead = 0;
1490
1491        list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492                if (lkb1 == lkb2) {
1493                        lkb_is_ahead = 1;
1494                        continue;
1495                }
1496
1497                if (!lkb_is_ahead) {
1498                        if (!modes_compat(lkb2, lkb1))
1499                                return 1;
1500                } else {
1501                        if (!modes_compat(lkb2, lkb1) &&
1502                            !modes_compat(lkb1, lkb2))
1503                                return 1;
1504                }
1505        }
1506        return 0;
1507}
1508
1509/*
1510 * Return 1 if the lock can be granted, 0 otherwise.
1511 * Also detect and resolve conversion deadlocks.
1512 *
1513 * lkb is the lock to be granted
1514 *
1515 * now is 1 if the function is being called in the context of the
1516 * immediate request, it is 0 if called later, after the lock has been
1517 * queued.
1518 *
1519 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520 */
1521
1522static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523{
1524        int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525
1526        /*
1527         * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528         * a new request for a NL mode lock being blocked.
1529         *
1530         * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531         * request, then it would be granted.  In essence, the use of this flag
1532         * tells the Lock Manager to expedite theis request by not considering
1533         * what may be in the CONVERTING or WAITING queues...  As of this
1534         * writing, the EXPEDITE flag can be used only with new requests for NL
1535         * mode locks.  This flag is not valid for conversion requests.
1536         *
1537         * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538         * conversion or used with a non-NL requested mode.  We also know an
1539         * EXPEDITE request is always granted immediately, so now must always
1540         * be 1.  The full condition to grant an expedite request: (now &&
1541         * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542         * therefore be shortened to just checking the flag.
1543         */
1544
1545        if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546                return 1;
1547
1548        /*
1549         * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550         * added to the remaining conditions.
1551         */
1552
1553        if (queue_conflict(&r->res_grantqueue, lkb))
1554                goto out;
1555
1556        /*
1557         * 6-3: By default, a conversion request is immediately granted if the
1558         * requested mode is compatible with the modes of all other granted
1559         * locks
1560         */
1561
1562        if (queue_conflict(&r->res_convertqueue, lkb))
1563                goto out;
1564
1565        /*
1566         * 6-5: But the default algorithm for deciding whether to grant or
1567         * queue conversion requests does not by itself guarantee that such
1568         * requests are serviced on a "first come first serve" basis.  This, in
1569         * turn, can lead to a phenomenon known as "indefinate postponement".
1570         *
1571         * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572         * the system service employed to request a lock conversion.  This flag
1573         * forces certain conversion requests to be queued, even if they are
1574         * compatible with the granted modes of other locks on the same
1575         * resource.  Thus, the use of this flag results in conversion requests
1576         * being ordered on a "first come first servce" basis.
1577         *
1578         * DCT: This condition is all about new conversions being able to occur
1579         * "in place" while the lock remains on the granted queue (assuming
1580         * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581         * doesn't _have_ to go onto the convert queue where it's processed in
1582         * order.  The "now" variable is necessary to distinguish converts
1583         * being received and processed for the first time now, because once a
1584         * convert is moved to the conversion queue the condition below applies
1585         * requiring fifo granting.
1586         */
1587
1588        if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589                return 1;
1590
1591        /*
1592         * The NOORDER flag is set to avoid the standard vms rules on grant
1593         * order.
1594         */
1595
1596        if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597                return 1;
1598
1599        /*
1600         * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601         * granted until all other conversion requests ahead of it are granted
1602         * and/or canceled.
1603         */
1604
1605        if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606                return 1;
1607
1608        /*
1609         * 6-4: By default, a new request is immediately granted only if all
1610         * three of the following conditions are satisfied when the request is
1611         * issued:
1612         * - The queue of ungranted conversion requests for the resource is
1613         *   empty.
1614         * - The queue of ungranted new requests for the resource is empty.
1615         * - The mode of the new request is compatible with the most
1616         *   restrictive mode of all granted locks on the resource.
1617         */
1618
1619        if (now && !conv && list_empty(&r->res_convertqueue) &&
1620            list_empty(&r->res_waitqueue))
1621                return 1;
1622
1623        /*
1624         * 6-4: Once a lock request is in the queue of ungranted new requests,
1625         * it cannot be granted until the queue of ungranted conversion
1626         * requests is empty, all ungranted new requests ahead of it are
1627         * granted and/or canceled, and it is compatible with the granted mode
1628         * of the most restrictive lock granted on the resource.
1629         */
1630
1631        if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632            first_in_list(lkb, &r->res_waitqueue))
1633                return 1;
1634 out:
1635        return 0;
1636}
1637
1638static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639                          int *err)
1640{
1641        int rv;
1642        int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643        int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644
1645        if (err)
1646                *err = 0;
1647
1648        rv = _can_be_granted(r, lkb, now);
1649        if (rv)
1650                goto out;
1651
1652        /*
1653         * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654         * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655         * cancels one of the locks.
1656         */
1657
1658        if (is_convert && can_be_queued(lkb) &&
1659            conversion_deadlock_detect(r, lkb)) {
1660                if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661                        lkb->lkb_grmode = DLM_LOCK_NL;
1662                        lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663                } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664                        if (err)
1665                                *err = -EDEADLK;
1666                        else {
1667                                log_print("can_be_granted deadlock %x now %d",
1668                                          lkb->lkb_id, now);
1669                                dlm_dump_rsb(r);
1670                        }
1671                }
1672                goto out;
1673        }
1674
1675        /*
1676         * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677         * to grant a request in a mode other than the normal rqmode.  It's a
1678         * simple way to provide a big optimization to applications that can
1679         * use them.
1680         */
1681
1682        if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683                alt = DLM_LOCK_PR;
1684        else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685                alt = DLM_LOCK_CW;
1686
1687        if (alt) {
1688                lkb->lkb_rqmode = alt;
1689                rv = _can_be_granted(r, lkb, now);
1690                if (rv)
1691                        lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692                else
1693                        lkb->lkb_rqmode = rqmode;
1694        }
1695 out:
1696        return rv;
1697}
1698
1699/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700   for locks pending on the convert list.  Once verified (watch for these
1701   log_prints), we should be able to just call _can_be_granted() and not
1702   bother with the demote/deadlk cases here (and there's no easy way to deal
1703   with a deadlk here, we'd have to generate something like grant_lock with
1704   the deadlk error.) */
1705
1706/* Returns the highest requested mode of all blocked conversions; sets
1707   cw if there's a blocked conversion to DLM_LOCK_CW. */
1708
1709static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710{
1711        struct dlm_lkb *lkb, *s;
1712        int hi, demoted, quit, grant_restart, demote_restart;
1713        int deadlk;
1714
1715        quit = 0;
1716 restart:
1717        grant_restart = 0;
1718        demote_restart = 0;
1719        hi = DLM_LOCK_IV;
1720
1721        list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722                demoted = is_demoted(lkb);
1723                deadlk = 0;
1724
1725                if (can_be_granted(r, lkb, 0, &deadlk)) {
1726                        grant_lock_pending(r, lkb);
1727                        grant_restart = 1;
1728                        continue;
1729                }
1730
1731                if (!demoted && is_demoted(lkb)) {
1732                        log_print("WARN: pending demoted %x node %d %s",
1733                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734                        demote_restart = 1;
1735                        continue;
1736                }
1737
1738                if (deadlk) {
1739                        log_print("WARN: pending deadlock %x node %d %s",
1740                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741                        dlm_dump_rsb(r);
1742                        continue;
1743                }
1744
1745                hi = max_t(int, lkb->lkb_rqmode, hi);
1746
1747                if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748                        *cw = 1;
1749        }
1750
1751        if (grant_restart)
1752                goto restart;
1753        if (demote_restart && !quit) {
1754                quit = 1;
1755                goto restart;
1756        }
1757
1758        return max_t(int, high, hi);
1759}
1760
1761static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762{
1763        struct dlm_lkb *lkb, *s;
1764
1765        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766                if (can_be_granted(r, lkb, 0, NULL))
1767                        grant_lock_pending(r, lkb);
1768                else {
1769                        high = max_t(int, lkb->lkb_rqmode, high);
1770                        if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771                                *cw = 1;
1772                }
1773        }
1774
1775        return high;
1776}
1777
1778/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779   on either the convert or waiting queue.
1780   high is the largest rqmode of all locks blocked on the convert or
1781   waiting queue. */
1782
1783static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784{
1785        if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786                if (gr->lkb_highbast < DLM_LOCK_EX)
1787                        return 1;
1788                return 0;
1789        }
1790
1791        if (gr->lkb_highbast < high &&
1792            !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793                return 1;
1794        return 0;
1795}
1796
1797static void grant_pending_locks(struct dlm_rsb *r)
1798{
1799        struct dlm_lkb *lkb, *s;
1800        int high = DLM_LOCK_IV;
1801        int cw = 0;
1802
1803        DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804
1805        high = grant_pending_convert(r, high, &cw);
1806        high = grant_pending_wait(r, high, &cw);
1807
1808        if (high == DLM_LOCK_IV)
1809                return;
1810
1811        /*
1812         * If there are locks left on the wait/convert queue then send blocking
1813         * ASTs to granted locks based on the largest requested mode (high)
1814         * found above.
1815         */
1816
1817        list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818                if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819                        if (cw && high == DLM_LOCK_PR &&
1820                            lkb->lkb_grmode == DLM_LOCK_PR)
1821                                queue_bast(r, lkb, DLM_LOCK_CW);
1822                        else
1823                                queue_bast(r, lkb, high);
1824                        lkb->lkb_highbast = high;
1825                }
1826        }
1827}
1828
1829static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830{
1831        if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832            (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833                if (gr->lkb_highbast < DLM_LOCK_EX)
1834                        return 1;
1835                return 0;
1836        }
1837
1838        if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839                return 1;
1840        return 0;
1841}
1842
1843static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844                            struct dlm_lkb *lkb)
1845{
1846        struct dlm_lkb *gr;
1847
1848        list_for_each_entry(gr, head, lkb_statequeue) {
1849                if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1850                        queue_bast(r, gr, lkb->lkb_rqmode);
1851                        gr->lkb_highbast = lkb->lkb_rqmode;
1852                }
1853        }
1854}
1855
1856static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1857{
1858        send_bast_queue(r, &r->res_grantqueue, lkb);
1859}
1860
1861static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1862{
1863        send_bast_queue(r, &r->res_grantqueue, lkb);
1864        send_bast_queue(r, &r->res_convertqueue, lkb);
1865}
1866
1867/* set_master(r, lkb) -- set the master nodeid of a resource
1868
1869   The purpose of this function is to set the nodeid field in the given
1870   lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1871   known, it can just be copied to the lkb and the function will return
1872   0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1873   before it can be copied to the lkb.
1874
1875   When the rsb nodeid is being looked up remotely, the initial lkb
1876   causing the lookup is kept on the ls_waiters list waiting for the
1877   lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1878   on the rsb's res_lookup list until the master is verified.
1879
1880   Return values:
1881   0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1882   1: the rsb master is not available and the lkb has been placed on
1883      a wait queue
1884*/
1885
1886static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1887{
1888        struct dlm_ls *ls = r->res_ls;
1889        int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1890
1891        if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1892                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1893                r->res_first_lkid = lkb->lkb_id;
1894                lkb->lkb_nodeid = r->res_nodeid;
1895                return 0;
1896        }
1897
1898        if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1899                list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1900                return 1;
1901        }
1902
1903        if (r->res_nodeid == 0) {
1904                lkb->lkb_nodeid = 0;
1905                return 0;
1906        }
1907
1908        if (r->res_nodeid > 0) {
1909                lkb->lkb_nodeid = r->res_nodeid;
1910                return 0;
1911        }
1912
1913        DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1914
1915        dir_nodeid = dlm_dir_nodeid(r);
1916
1917        if (dir_nodeid != our_nodeid) {
1918                r->res_first_lkid = lkb->lkb_id;
1919                send_lookup(r, lkb);
1920                return 1;
1921        }
1922
1923        for (i = 0; i < 2; i++) {
1924                /* It's possible for dlm_scand to remove an old rsb for
1925                   this same resource from the toss list, us to create
1926                   a new one, look up the master locally, and find it
1927                   already exists just before dlm_scand does the
1928                   dir_remove() on the previous rsb. */
1929
1930                error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1931                                       r->res_length, &ret_nodeid);
1932                if (!error)
1933                        break;
1934                log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1935                schedule();
1936        }
1937        if (error && error != -EEXIST)
1938                return error;
1939
1940        if (ret_nodeid == our_nodeid) {
1941                r->res_first_lkid = 0;
1942                r->res_nodeid = 0;
1943                lkb->lkb_nodeid = 0;
1944        } else {
1945                r->res_first_lkid = lkb->lkb_id;
1946                r->res_nodeid = ret_nodeid;
1947                lkb->lkb_nodeid = ret_nodeid;
1948        }
1949        return 0;
1950}
1951
1952static void process_lookup_list(struct dlm_rsb *r)
1953{
1954        struct dlm_lkb *lkb, *safe;
1955
1956        list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1957                list_del_init(&lkb->lkb_rsb_lookup);
1958                _request_lock(r, lkb);
1959                schedule();
1960        }
1961}
1962
1963/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1964
1965static void confirm_master(struct dlm_rsb *r, int error)
1966{
1967        struct dlm_lkb *lkb;
1968
1969        if (!r->res_first_lkid)
1970                return;
1971
1972        switch (error) {
1973        case 0:
1974        case -EINPROGRESS:
1975                r->res_first_lkid = 0;
1976                process_lookup_list(r);
1977                break;
1978
1979        case -EAGAIN:
1980        case -EBADR:
1981        case -ENOTBLK:
1982                /* the remote request failed and won't be retried (it was
1983                   a NOQUEUE, or has been canceled/unlocked); make a waiting
1984                   lkb the first_lkid */
1985
1986                r->res_first_lkid = 0;
1987
1988                if (!list_empty(&r->res_lookup)) {
1989                        lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1990                                         lkb_rsb_lookup);
1991                        list_del_init(&lkb->lkb_rsb_lookup);
1992                        r->res_first_lkid = lkb->lkb_id;
1993                        _request_lock(r, lkb);
1994                }
1995                break;
1996
1997        default:
1998                log_error(r->res_ls, "confirm_master unknown error %d", error);
1999        }
2000}
2001
2002static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2003                         int namelen, unsigned long timeout_cs,
2004                         void (*ast) (void *astparam),
2005                         void *astparam,
2006                         void (*bast) (void *astparam, int mode),
2007                         struct dlm_args *args)
2008{
2009        int rv = -EINVAL;
2010
2011        /* check for invalid arg usage */
2012
2013        if (mode < 0 || mode > DLM_LOCK_EX)
2014                goto out;
2015
2016        if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2017                goto out;
2018
2019        if (flags & DLM_LKF_CANCEL)
2020                goto out;
2021
2022        if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2023                goto out;
2024
2025        if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2026                goto out;
2027
2028        if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2029                goto out;
2030
2031        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2032                goto out;
2033
2034        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2035                goto out;
2036
2037        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2038                goto out;
2039
2040        if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2041                goto out;
2042
2043        if (!ast || !lksb)
2044                goto out;
2045
2046        if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2047                goto out;
2048
2049        if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2050                goto out;
2051
2052        /* these args will be copied to the lkb in validate_lock_args,
2053           it cannot be done now because when converting locks, fields in
2054           an active lkb cannot be modified before locking the rsb */
2055
2056        args->flags = flags;
2057        args->astfn = ast;
2058        args->astparam = astparam;
2059        args->bastfn = bast;
2060        args->timeout = timeout_cs;
2061        args->mode = mode;
2062        args->lksb = lksb;
2063        rv = 0;
2064 out:
2065        return rv;
2066}
2067
2068static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2069{
2070        if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2071                      DLM_LKF_FORCEUNLOCK))
2072                return -EINVAL;
2073
2074        if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2075                return -EINVAL;
2076
2077        args->flags = flags;
2078        args->astparam = astarg;
2079        return 0;
2080}
2081
2082static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2083                              struct dlm_args *args)
2084{
2085        int rv = -EINVAL;
2086
2087        if (args->flags & DLM_LKF_CONVERT) {
2088                if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2089                        goto out;
2090
2091                if (args->flags & DLM_LKF_QUECVT &&
2092                    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2093                        goto out;
2094
2095                rv = -EBUSY;
2096                if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2097                        goto out;
2098
2099                if (lkb->lkb_wait_type)
2100                        goto out;
2101
2102                if (is_overlap(lkb))
2103                        goto out;
2104        }
2105
2106        lkb->lkb_exflags = args->flags;
2107        lkb->lkb_sbflags = 0;
2108        lkb->lkb_astfn = args->astfn;
2109        lkb->lkb_astparam = args->astparam;
2110        lkb->lkb_bastfn = args->bastfn;
2111        lkb->lkb_rqmode = args->mode;
2112        lkb->lkb_lksb = args->lksb;
2113        lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2114        lkb->lkb_ownpid = (int) current->pid;
2115        lkb->lkb_timeout_cs = args->timeout;
2116        rv = 0;
2117 out:
2118        if (rv)
2119                log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2120                          rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2121                          lkb->lkb_status, lkb->lkb_wait_type,
2122                          lkb->lkb_resource->res_name);
2123        return rv;
2124}
2125
2126/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2127   for success */
2128
2129/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2130   because there may be a lookup in progress and it's valid to do
2131   cancel/unlockf on it */
2132
2133static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2134{
2135        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2136        int rv = -EINVAL;
2137
2138        if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2139                log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2140                dlm_print_lkb(lkb);
2141                goto out;
2142        }
2143
2144        /* an lkb may still exist even though the lock is EOL'ed due to a
2145           cancel, unlock or failed noqueue request; an app can't use these
2146           locks; return same error as if the lkid had not been found at all */
2147
2148        if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2149                log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2150                rv = -ENOENT;
2151                goto out;
2152        }
2153
2154        /* an lkb may be waiting for an rsb lookup to complete where the
2155           lookup was initiated by another lock */
2156
2157        if (!list_empty(&lkb->lkb_rsb_lookup)) {
2158                if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2159                        log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2160                        list_del_init(&lkb->lkb_rsb_lookup);
2161                        queue_cast(lkb->lkb_resource, lkb,
2162                                   args->flags & DLM_LKF_CANCEL ?
2163                                   -DLM_ECANCEL : -DLM_EUNLOCK);
2164                        unhold_lkb(lkb); /* undoes create_lkb() */
2165                }
2166                /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2167                rv = -EBUSY;
2168                goto out;
2169        }
2170
2171        /* cancel not allowed with another cancel/unlock in progress */
2172
2173        if (args->flags & DLM_LKF_CANCEL) {
2174                if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2175                        goto out;
2176
2177                if (is_overlap(lkb))
2178                        goto out;
2179
2180                /* don't let scand try to do a cancel */
2181                del_timeout(lkb);
2182
2183                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2184                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2185                        rv = -EBUSY;
2186                        goto out;
2187                }
2188
2189                /* there's nothing to cancel */
2190                if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2191                    !lkb->lkb_wait_type) {
2192                        rv = -EBUSY;
2193                        goto out;
2194                }
2195
2196                switch (lkb->lkb_wait_type) {
2197                case DLM_MSG_LOOKUP:
2198                case DLM_MSG_REQUEST:
2199                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2200                        rv = -EBUSY;
2201                        goto out;
2202                case DLM_MSG_UNLOCK:
2203                case DLM_MSG_CANCEL:
2204                        goto out;
2205                }
2206                /* add_to_waiters() will set OVERLAP_CANCEL */
2207                goto out_ok;
2208        }
2209
2210        /* do we need to allow a force-unlock if there's a normal unlock
2211           already in progress?  in what conditions could the normal unlock
2212           fail such that we'd want to send a force-unlock to be sure? */
2213
2214        if (args->flags & DLM_LKF_FORCEUNLOCK) {
2215                if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2216                        goto out;
2217
2218                if (is_overlap_unlock(lkb))
2219                        goto out;
2220
2221                /* don't let scand try to do a cancel */
2222                del_timeout(lkb);
2223
2224                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2225                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2226                        rv = -EBUSY;
2227                        goto out;
2228                }
2229
2230                switch (lkb->lkb_wait_type) {
2231                case DLM_MSG_LOOKUP:
2232                case DLM_MSG_REQUEST:
2233                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2234                        rv = -EBUSY;
2235                        goto out;
2236                case DLM_MSG_UNLOCK:
2237                        goto out;
2238                }
2239                /* add_to_waiters() will set OVERLAP_UNLOCK */
2240                goto out_ok;
2241        }
2242
2243        /* normal unlock not allowed if there's any op in progress */
2244        rv = -EBUSY;
2245        if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2246                goto out;
2247
2248 out_ok:
2249        /* an overlapping op shouldn't blow away exflags from other op */
2250        lkb->lkb_exflags |= args->flags;
2251        lkb->lkb_sbflags = 0;
2252        lkb->lkb_astparam = args->astparam;
2253        rv = 0;
2254 out:
2255        if (rv)
2256                log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2257                          lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2258                          args->flags, lkb->lkb_wait_type,
2259                          lkb->lkb_resource->res_name);
2260        return rv;
2261}
2262
2263/*
2264 * Four stage 4 varieties:
2265 * do_request(), do_convert(), do_unlock(), do_cancel()
2266 * These are called on the master node for the given lock and
2267 * from the central locking logic.
2268 */
2269
2270static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2271{
2272        int error = 0;
2273
2274        if (can_be_granted(r, lkb, 1, NULL)) {
2275                grant_lock(r, lkb);
2276                queue_cast(r, lkb, 0);
2277                goto out;
2278        }
2279
2280        if (can_be_queued(lkb)) {
2281                error = -EINPROGRESS;
2282                add_lkb(r, lkb, DLM_LKSTS_WAITING);
2283                send_blocking_asts(r, lkb);
2284                add_timeout(lkb);
2285                goto out;
2286        }
2287
2288        error = -EAGAIN;
2289        if (force_blocking_asts(lkb))
2290                send_blocking_asts_all(r, lkb);
2291        queue_cast(r, lkb, -EAGAIN);
2292
2293 out:
2294        return error;
2295}
2296
2297static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2298{
2299        int error = 0;
2300        int deadlk = 0;
2301
2302        /* changing an existing lock may allow others to be granted */
2303
2304        if (can_be_granted(r, lkb, 1, &deadlk)) {
2305                grant_lock(r, lkb);
2306                queue_cast(r, lkb, 0);
2307                grant_pending_locks(r);
2308                goto out;
2309        }
2310
2311        /* can_be_granted() detected that this lock would block in a conversion
2312           deadlock, so we leave it on the granted queue and return EDEADLK in
2313           the ast for the convert. */
2314
2315        if (deadlk) {
2316                /* it's left on the granted queue */
2317                log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2318                          lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2319                          lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2320                revert_lock(r, lkb);
2321                queue_cast(r, lkb, -EDEADLK);
2322                error = -EDEADLK;
2323                goto out;
2324        }
2325
2326        /* is_demoted() means the can_be_granted() above set the grmode
2327           to NL, and left us on the granted queue.  This auto-demotion
2328           (due to CONVDEADLK) might mean other locks, and/or this lock, are
2329           now grantable.  We have to try to grant other converting locks
2330           before we try again to grant this one. */
2331
2332        if (is_demoted(lkb)) {
2333                grant_pending_convert(r, DLM_LOCK_IV, NULL);
2334                if (_can_be_granted(r, lkb, 1)) {
2335                        grant_lock(r, lkb);
2336                        queue_cast(r, lkb, 0);
2337                        grant_pending_locks(r);
2338                        goto out;
2339                }
2340                /* else fall through and move to convert queue */
2341        }
2342
2343        if (can_be_queued(lkb)) {
2344                error = -EINPROGRESS;
2345                del_lkb(r, lkb);
2346                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2347                send_blocking_asts(r, lkb);
2348                add_timeout(lkb);
2349                goto out;
2350        }
2351
2352        error = -EAGAIN;
2353        if (force_blocking_asts(lkb))
2354                send_blocking_asts_all(r, lkb);
2355        queue_cast(r, lkb, -EAGAIN);
2356
2357 out:
2358        return error;
2359}
2360
2361static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2362{
2363        remove_lock(r, lkb);
2364        queue_cast(r, lkb, -DLM_EUNLOCK);
2365        grant_pending_locks(r);
2366        return -DLM_EUNLOCK;
2367}
2368
2369/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2370 
2371static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2372{
2373        int error;
2374
2375        error = revert_lock(r, lkb);
2376        if (error) {
2377                queue_cast(r, lkb, -DLM_ECANCEL);
2378                grant_pending_locks(r);
2379                return -DLM_ECANCEL;
2380        }
2381        return 0;
2382}
2383
2384/*
2385 * Four stage 3 varieties:
2386 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2387 */
2388
2389/* add a new lkb to a possibly new rsb, called by requesting process */
2390
2391static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2392{
2393        int error;
2394
2395        /* set_master: sets lkb nodeid from r */
2396
2397        error = set_master(r, lkb);
2398        if (error < 0)
2399                goto out;
2400        if (error) {
2401                error = 0;
2402                goto out;
2403        }
2404
2405        if (is_remote(r))
2406                /* receive_request() calls do_request() on remote node */
2407                error = send_request(r, lkb);
2408        else
2409                error = do_request(r, lkb);
2410 out:
2411        return error;
2412}
2413
2414/* change some property of an existing lkb, e.g. mode */
2415
2416static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2417{
2418        int error;
2419
2420        if (is_remote(r))
2421                /* receive_convert() calls do_convert() on remote node */
2422                error = send_convert(r, lkb);
2423        else
2424                error = do_convert(r, lkb);
2425
2426        return error;
2427}
2428
2429/* remove an existing lkb from the granted queue */
2430
2431static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2432{
2433        int error;
2434
2435        if (is_remote(r))
2436                /* receive_unlock() calls do_unlock() on remote node */
2437                error = send_unlock(r, lkb);
2438        else
2439                error = do_unlock(r, lkb);
2440
2441        return error;
2442}
2443
2444/* remove an existing lkb from the convert or wait queue */
2445
2446static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2447{
2448        int error;
2449
2450        if (is_remote(r))
2451                /* receive_cancel() calls do_cancel() on remote node */
2452                error = send_cancel(r, lkb);
2453        else
2454                error = do_cancel(r, lkb);
2455
2456        return error;
2457}
2458
2459/*
2460 * Four stage 2 varieties:
2461 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2462 */
2463
2464static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2465                        int len, struct dlm_args *args)
2466{
2467        struct dlm_rsb *r;
2468        int error;
2469
2470        error = validate_lock_args(ls, lkb, args);
2471        if (error)
2472                goto out;
2473
2474        error = find_rsb(ls, name, len, R_CREATE, &r);
2475        if (error)
2476                goto out;
2477
2478        lock_rsb(r);
2479
2480        attach_lkb(r, lkb);
2481        lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2482
2483        error = _request_lock(r, lkb);
2484
2485        unlock_rsb(r);
2486        put_rsb(r);
2487
2488 out:
2489        return error;
2490}
2491
2492static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2493                        struct dlm_args *args)
2494{
2495        struct dlm_rsb *r;
2496        int error;
2497
2498        r = lkb->lkb_resource;
2499
2500        hold_rsb(r);
2501        lock_rsb(r);
2502
2503        error = validate_lock_args(ls, lkb, args);
2504        if (error)
2505                goto out;
2506
2507        error = _convert_lock(r, lkb);
2508 out:
2509        unlock_rsb(r);
2510        put_rsb(r);
2511        return error;
2512}
2513
2514static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2515                       struct dlm_args *args)
2516{
2517        struct dlm_rsb *r;
2518        int error;
2519
2520        r = lkb->lkb_resource;
2521
2522        hold_rsb(r);
2523        lock_rsb(r);
2524
2525        error = validate_unlock_args(lkb, args);
2526        if (error)
2527                goto out;
2528
2529        error = _unlock_lock(r, lkb);
2530 out:
2531        unlock_rsb(r);
2532        put_rsb(r);
2533        return error;
2534}
2535
2536static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2537                       struct dlm_args *args)
2538{
2539        struct dlm_rsb *r;
2540        int error;
2541
2542        r = lkb->lkb_resource;
2543
2544        hold_rsb(r);
2545        lock_rsb(r);
2546
2547        error = validate_unlock_args(lkb, args);
2548        if (error)
2549                goto out;
2550
2551        error = _cancel_lock(r, lkb);
2552 out:
2553        unlock_rsb(r);
2554        put_rsb(r);
2555        return error;
2556}
2557
2558/*
2559 * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2560 */
2561
2562int dlm_lock(dlm_lockspace_t *lockspace,
2563             int mode,
2564             struct dlm_lksb *lksb,
2565             uint32_t flags,
2566             void *name,
2567             unsigned int namelen,
2568             uint32_t parent_lkid,
2569             void (*ast) (void *astarg),
2570             void *astarg,
2571             void (*bast) (void *astarg, int mode))
2572{
2573        struct dlm_ls *ls;
2574        struct dlm_lkb *lkb;
2575        struct dlm_args args;
2576        int error, convert = flags & DLM_LKF_CONVERT;
2577
2578        ls = dlm_find_lockspace_local(lockspace);
2579        if (!ls)
2580                return -EINVAL;
2581
2582        dlm_lock_recovery(ls);
2583
2584        if (convert)
2585                error = find_lkb(ls, lksb->sb_lkid, &lkb);
2586        else
2587                error = create_lkb(ls, &lkb);
2588
2589        if (error)
2590                goto out;
2591
2592        error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2593                              astarg, bast, &args);
2594        if (error)
2595                goto out_put;
2596
2597        if (convert)
2598                error = convert_lock(ls, lkb, &args);
2599        else
2600                error = request_lock(ls, lkb, name, namelen, &args);
2601
2602        if (error == -EINPROGRESS)
2603                error = 0;
2604 out_put:
2605        if (convert || error)
2606                __put_lkb(ls, lkb);
2607        if (error == -EAGAIN || error == -EDEADLK)
2608                error = 0;
2609 out:
2610        dlm_unlock_recovery(ls);
2611        dlm_put_lockspace(ls);
2612        return error;
2613}
2614
2615int dlm_unlock(dlm_lockspace_t *lockspace,
2616               uint32_t lkid,
2617               uint32_t flags,
2618               struct dlm_lksb *lksb,
2619               void *astarg)
2620{
2621        struct dlm_ls *ls;
2622        struct dlm_lkb *lkb;
2623        struct dlm_args args;
2624        int error;
2625
2626        ls = dlm_find_lockspace_local(lockspace);
2627        if (!ls)
2628                return -EINVAL;
2629
2630        dlm_lock_recovery(ls);
2631
2632        error = find_lkb(ls, lkid, &lkb);
2633        if (error)
2634                goto out;
2635
2636        error = set_unlock_args(flags, astarg, &args);
2637        if (error)
2638                goto out_put;
2639
2640        if (flags & DLM_LKF_CANCEL)
2641                error = cancel_lock(ls, lkb, &args);
2642        else
2643                error = unlock_lock(ls, lkb, &args);
2644
2645        if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2646                error = 0;
2647        if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2648                error = 0;
2649 out_put:
2650        dlm_put_lkb(lkb);
2651 out:
2652        dlm_unlock_recovery(ls);
2653        dlm_put_lockspace(ls);
2654        return error;
2655}
2656
2657/*
2658 * send/receive routines for remote operations and replies
2659 *
2660 * send_args
2661 * send_common
2662 * send_request                 receive_request
2663 * send_convert                 receive_convert
2664 * send_unlock                  receive_unlock
2665 * send_cancel                  receive_cancel
2666 * send_grant                   receive_grant
2667 * send_bast                    receive_bast
2668 * send_lookup                  receive_lookup
2669 * send_remove                  receive_remove
2670 *
2671 *                              send_common_reply
2672 * receive_request_reply        send_request_reply
2673 * receive_convert_reply        send_convert_reply
2674 * receive_unlock_reply         send_unlock_reply
2675 * receive_cancel_reply         send_cancel_reply
2676 * receive_lookup_reply         send_lookup_reply
2677 */
2678
2679static int _create_message(struct dlm_ls *ls, int mb_len,
2680                           int to_nodeid, int mstype,
2681                           struct dlm_message **ms_ret,
2682                           struct dlm_mhandle **mh_ret)
2683{
2684        struct dlm_message *ms;
2685        struct dlm_mhandle *mh;
2686        char *mb;
2687
2688        /* get_buffer gives us a message handle (mh) that we need to
2689           pass into lowcomms_commit and a message buffer (mb) that we
2690           write our data into */
2691
2692        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2693        if (!mh)
2694                return -ENOBUFS;
2695
2696        memset(mb, 0, mb_len);
2697
2698        ms = (struct dlm_message *) mb;
2699
2700        ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2701        ms->m_header.h_lockspace = ls->ls_global_id;
2702        ms->m_header.h_nodeid = dlm_our_nodeid();
2703        ms->m_header.h_length = mb_len;
2704        ms->m_header.h_cmd = DLM_MSG;
2705
2706        ms->m_type = mstype;
2707
2708        *mh_ret = mh;
2709        *ms_ret = ms;
2710        return 0;
2711}
2712
2713static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2714                          int to_nodeid, int mstype,
2715                          struct dlm_message **ms_ret,
2716                          struct dlm_mhandle **mh_ret)
2717{
2718        int mb_len = sizeof(struct dlm_message);
2719
2720        switch (mstype) {
2721        case DLM_MSG_REQUEST:
2722        case DLM_MSG_LOOKUP:
2723        case DLM_MSG_REMOVE:
2724                mb_len += r->res_length;
2725                break;
2726        case DLM_MSG_CONVERT:
2727        case DLM_MSG_UNLOCK:
2728        case DLM_MSG_REQUEST_REPLY:
2729        case DLM_MSG_CONVERT_REPLY:
2730        case DLM_MSG_GRANT:
2731                if (lkb && lkb->lkb_lvbptr)
2732                        mb_len += r->res_ls->ls_lvblen;
2733                break;
2734        }
2735
2736        return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2737                               ms_ret, mh_ret);
2738}
2739
2740/* further lowcomms enhancements or alternate implementations may make
2741   the return value from this function useful at some point */
2742
2743static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2744{
2745        dlm_message_out(ms);
2746        dlm_lowcomms_commit_buffer(mh);
2747        return 0;
2748}
2749
2750static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2751                      struct dlm_message *ms)
2752{
2753        ms->m_nodeid   = lkb->lkb_nodeid;
2754        ms->m_pid      = lkb->lkb_ownpid;
2755        ms->m_lkid     = lkb->lkb_id;
2756        ms->m_remid    = lkb->lkb_remid;
2757        ms->m_exflags  = lkb->lkb_exflags;
2758        ms->m_sbflags  = lkb->lkb_sbflags;
2759        ms->m_flags    = lkb->lkb_flags;
2760        ms->m_lvbseq   = lkb->lkb_lvbseq;
2761        ms->m_status   = lkb->lkb_status;
2762        ms->m_grmode   = lkb->lkb_grmode;
2763        ms->m_rqmode   = lkb->lkb_rqmode;
2764        ms->m_hash     = r->res_hash;
2765
2766        /* m_result and m_bastmode are set from function args,
2767           not from lkb fields */
2768
2769        if (lkb->lkb_bastfn)
2770                ms->m_asts |= AST_BAST;
2771        if (lkb->lkb_astfn)
2772                ms->m_asts |= AST_COMP;
2773
2774        /* compare with switch in create_message; send_remove() doesn't
2775           use send_args() */
2776
2777        switch (ms->m_type) {
2778        case DLM_MSG_REQUEST:
2779        case DLM_MSG_LOOKUP:
2780                memcpy(ms->m_extra, r->res_name, r->res_length);
2781                break;
2782        case DLM_MSG_CONVERT:
2783        case DLM_MSG_UNLOCK:
2784        case DLM_MSG_REQUEST_REPLY:
2785        case DLM_MSG_CONVERT_REPLY:
2786        case DLM_MSG_GRANT:
2787                if (!lkb->lkb_lvbptr)
2788                        break;
2789                memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2790                break;
2791        }
2792}
2793
2794static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2795{
2796        struct dlm_message *ms;
2797        struct dlm_mhandle *mh;
2798        int to_nodeid, error;
2799
2800        error = add_to_waiters(lkb, mstype);
2801        if (error)
2802                return error;
2803
2804        to_nodeid = r->res_nodeid;
2805
2806        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2807        if (error)
2808                goto fail;
2809
2810        send_args(r, lkb, ms);
2811
2812        error = send_message(mh, ms);
2813        if (error)
2814                goto fail;
2815        return 0;
2816
2817 fail:
2818        remove_from_waiters(lkb, msg_reply_type(mstype));
2819        return error;
2820}
2821
2822static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2823{
2824        return send_common(r, lkb, DLM_MSG_REQUEST);
2825}
2826
2827static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2828{
2829        int error;
2830
2831        error = send_common(r, lkb, DLM_MSG_CONVERT);
2832
2833        /* down conversions go without a reply from the master */
2834        if (!error && down_conversion(lkb)) {
2835                remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2836                r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2837                r->res_ls->ls_stub_ms.m_result = 0;
2838                r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2839                __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2840        }
2841
2842        return error;
2843}
2844
2845/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2846   MASTER_UNCERTAIN to force the next request on the rsb to confirm
2847   that the master is still correct. */
2848
2849static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2850{
2851        return send_common(r, lkb, DLM_MSG_UNLOCK);
2852}
2853
2854static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2855{
2856        return send_common(r, lkb, DLM_MSG_CANCEL);
2857}
2858
2859static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2860{
2861        struct dlm_message *ms;
2862        struct dlm_mhandle *mh;
2863        int to_nodeid, error;
2864
2865        to_nodeid = lkb->lkb_nodeid;
2866
2867        error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2868        if (error)
2869                goto out;
2870
2871        send_args(r, lkb, ms);
2872
2873        ms->m_result = 0;
2874
2875        error = send_message(mh, ms);
2876 out:
2877        return error;
2878}
2879
2880static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2881{
2882        struct dlm_message *ms;
2883        struct dlm_mhandle *mh;
2884        int to_nodeid, error;
2885
2886        to_nodeid = lkb->lkb_nodeid;
2887
2888        error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2889        if (error)
2890                goto out;
2891
2892        send_args(r, lkb, ms);
2893
2894        ms->m_bastmode = mode;
2895
2896        error = send_message(mh, ms);
2897 out:
2898        return error;
2899}
2900
2901static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2902{
2903        struct dlm_message *ms;
2904        struct dlm_mhandle *mh;
2905        int to_nodeid, error;
2906
2907        error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2908        if (error)
2909                return error;
2910
2911        to_nodeid = dlm_dir_nodeid(r);
2912
2913        error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2914        if (error)
2915                goto fail;
2916
2917        send_args(r, lkb, ms);
2918
2919        error = send_message(mh, ms);
2920        if (error)
2921                goto fail;
2922        return 0;
2923
2924 fail:
2925        remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2926        return error;
2927}
2928
2929static int send_remove(struct dlm_rsb *r)
2930{
2931        struct dlm_message *ms;
2932        struct dlm_mhandle *mh;
2933        int to_nodeid, error;
2934
2935        to_nodeid = dlm_dir_nodeid(r);
2936
2937        error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2938        if (error)
2939                goto out;
2940
2941        memcpy(ms->m_extra, r->res_name, r->res_length);
2942        ms->m_hash = r->res_hash;
2943
2944        error = send_message(mh, ms);
2945 out:
2946        return error;
2947}
2948
2949static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2950                             int mstype, int rv)
2951{
2952        struct dlm_message *ms;
2953        struct dlm_mhandle *mh;
2954        int to_nodeid, error;
2955
2956        to_nodeid = lkb->lkb_nodeid;
2957
2958        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2959        if (error)
2960                goto out;
2961
2962        send_args(r, lkb, ms);
2963
2964        ms->m_result = rv;
2965
2966        error = send_message(mh, ms);
2967 out:
2968        return error;
2969}
2970
2971static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2972{
2973        return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2974}
2975
2976static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2977{
2978        return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2979}
2980
2981static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2982{
2983        return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2984}
2985
2986static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2987{
2988        return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2989}
2990
2991static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2992                             int ret_nodeid, int rv)
2993{
2994        struct dlm_rsb *r = &ls->ls_stub_rsb;
2995        struct dlm_message *ms;
2996        struct dlm_mhandle *mh;
2997        int error, nodeid = ms_in->m_header.h_nodeid;
2998
2999        error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3000        if (error)
3001                goto out;
3002
3003        ms->m_lkid = ms_in->m_lkid;
3004        ms->m_result = rv;
3005        ms->m_nodeid = ret_nodeid;
3006
3007        error = send_message(mh, ms);
3008 out:
3009        return error;
3010}
3011
3012/* which args we save from a received message depends heavily on the type
3013   of message, unlike the send side where we can safely send everything about
3014   the lkb for any type of message */
3015
3016static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3017{
3018        lkb->lkb_exflags = ms->m_exflags;
3019        lkb->lkb_sbflags = ms->m_sbflags;
3020        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3021                         (ms->m_flags & 0x0000FFFF);
3022}
3023
3024static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3025{
3026        lkb->lkb_sbflags = ms->m_sbflags;
3027        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3028                         (ms->m_flags & 0x0000FFFF);
3029}
3030
3031static int receive_extralen(struct dlm_message *ms)
3032{
3033        return (ms->m_header.h_length - sizeof(struct dlm_message));
3034}
3035
3036static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3037                       struct dlm_message *ms)
3038{
3039        int len;
3040
3041        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3042                if (!lkb->lkb_lvbptr)
3043                        lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3044                if (!lkb->lkb_lvbptr)
3045                        return -ENOMEM;
3046                len = receive_extralen(ms);
3047                if (len > DLM_RESNAME_MAXLEN)
3048                        len = DLM_RESNAME_MAXLEN;
3049                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3050        }
3051        return 0;
3052}
3053
3054static void fake_bastfn(void *astparam, int mode)
3055{
3056        log_print("fake_bastfn should not be called");
3057}
3058
3059static void fake_astfn(void *astparam)
3060{
3061        log_print("fake_astfn should not be called");
3062}
3063
3064static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3065                                struct dlm_message *ms)
3066{
3067        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3068        lkb->lkb_ownpid = ms->m_pid;
3069        lkb->lkb_remid = ms->m_lkid;
3070        lkb->lkb_grmode = DLM_LOCK_IV;
3071        lkb->lkb_rqmode = ms->m_rqmode;
3072
3073        lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3074        lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3075
3076        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3077                /* lkb was just created so there won't be an lvb yet */
3078                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3079                if (!lkb->lkb_lvbptr)
3080                        return -ENOMEM;
3081        }
3082
3083        return 0;
3084}
3085
3086static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3087                                struct dlm_message *ms)
3088{
3089        if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3090                return -EBUSY;
3091
3092        if (receive_lvb(ls, lkb, ms))
3093                return -ENOMEM;
3094
3095        lkb->lkb_rqmode = ms->m_rqmode;
3096        lkb->lkb_lvbseq = ms->m_lvbseq;
3097
3098        return 0;
3099}
3100
3101static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3102                               struct dlm_message *ms)
3103{
3104        if (receive_lvb(ls, lkb, ms))
3105                return -ENOMEM;
3106        return 0;
3107}
3108
3109/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3110   uses to send a reply and that the remote end uses to process the reply. */
3111
3112static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3113{
3114        struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3115        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3116        lkb->lkb_remid = ms->m_lkid;
3117}
3118
3119/* This is called after the rsb is locked so that we can safely inspect
3120   fields in the lkb. */
3121
3122static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3123{
3124        int from = ms->m_header.h_nodeid;
3125        int error = 0;
3126
3127        switch (ms->m_type) {
3128        case DLM_MSG_CONVERT:
3129        case DLM_MSG_UNLOCK:
3130        case DLM_MSG_CANCEL:
3131                if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3132                        error = -EINVAL;
3133                break;
3134
3135        case DLM_MSG_CONVERT_REPLY:
3136        case DLM_MSG_UNLOCK_REPLY:
3137        case DLM_MSG_CANCEL_REPLY:
3138        case DLM_MSG_GRANT:
3139        case DLM_MSG_BAST:
3140                if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3141                        error = -EINVAL;
3142                break;
3143
3144        case DLM_MSG_REQUEST_REPLY:
3145                if (!is_process_copy(lkb))
3146                        error = -EINVAL;
3147                else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3148                        error = -EINVAL;
3149                break;
3150
3151        default:
3152                error = -EINVAL;
3153        }
3154
3155        if (error)
3156                log_error(lkb->lkb_resource->res_ls,
3157                          "ignore invalid message %d from %d %x %x %x %d",
3158                          ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3159                          lkb->lkb_flags, lkb->lkb_nodeid);
3160        return error;
3161}
3162
3163static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3164{
3165        struct dlm_lkb *lkb;
3166        struct dlm_rsb *r;
3167        int error, namelen;
3168
3169        error = create_lkb(ls, &lkb);
3170        if (error)
3171                goto fail;
3172
3173        receive_flags(lkb, ms);
3174        lkb->lkb_flags |= DLM_IFL_MSTCPY;
3175        error = receive_request_args(ls, lkb, ms);
3176        if (error) {
3177                __put_lkb(ls, lkb);
3178                goto fail;
3179        }
3180
3181        namelen = receive_extralen(ms);
3182
3183        error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3184        if (error) {
3185                __put_lkb(ls, lkb);
3186                goto fail;
3187        }
3188
3189        lock_rsb(r);
3190
3191        attach_lkb(r, lkb);
3192        error = do_request(r, lkb);
3193        send_request_reply(r, lkb, error);
3194
3195        unlock_rsb(r);
3196        put_rsb(r);
3197
3198        if (error == -EINPROGRESS)
3199                error = 0;
3200        if (error)
3201                dlm_put_lkb(lkb);
3202        return;
3203
3204 fail:
3205        setup_stub_lkb(ls, ms);
3206        send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3207}
3208
3209static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3210{
3211        struct dlm_lkb *lkb;
3212        struct dlm_rsb *r;
3213        int error, reply = 1;
3214
3215        error = find_lkb(ls, ms->m_remid, &lkb);
3216        if (error)
3217                goto fail;
3218
3219        r = lkb->lkb_resource;
3220
3221        hold_rsb(r);
3222        lock_rsb(r);
3223
3224        error = validate_message(lkb, ms);
3225        if (error)
3226                goto out;
3227
3228        receive_flags(lkb, ms);
3229        error = receive_convert_args(ls, lkb, ms);
3230        if (error)
3231                goto out_reply;
3232        reply = !down_conversion(lkb);
3233
3234        error = do_convert(r, lkb);
3235 out_reply:
3236        if (reply)
3237                send_convert_reply(r, lkb, error);
3238 out:
3239        unlock_rsb(r);
3240        put_rsb(r);
3241        dlm_put_lkb(lkb);
3242        return;
3243
3244 fail:
3245        setup_stub_lkb(ls, ms);
3246        send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3247}
3248
3249static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3250{
3251        struct dlm_lkb *lkb;
3252        struct dlm_rsb *r;
3253        int error;
3254
3255        error = find_lkb(ls, ms->m_remid, &lkb);
3256        if (error)
3257                goto fail;
3258
3259        r = lkb->lkb_resource;
3260
3261        hold_rsb(r);
3262        lock_rsb(r);
3263
3264        error = validate_message(lkb, ms);
3265        if (error)
3266                goto out;
3267
3268        receive_flags(lkb, ms);
3269        error = receive_unlock_args(ls, lkb, ms);
3270        if (error)
3271                goto out_reply;
3272
3273        error = do_unlock(r, lkb);
3274 out_reply:
3275        send_unlock_reply(r, lkb, error);
3276 out:
3277        unlock_rsb(r);
3278        put_rsb(r);
3279        dlm_put_lkb(lkb);
3280        return;
3281
3282 fail:
3283        setup_stub_lkb(ls, ms);
3284        send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3285}
3286
3287static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3288{
3289        struct dlm_lkb *lkb;
3290        struct dlm_rsb *r;
3291        int error;
3292
3293        error = find_lkb(ls, ms->m_remid, &lkb);
3294        if (error)
3295                goto fail;
3296
3297        receive_flags(lkb, ms);
3298
3299        r = lkb->lkb_resource;
3300
3301        hold_rsb(r);
3302        lock_rsb(r);
3303
3304        error = validate_message(lkb, ms);
3305        if (error)
3306                goto out;
3307
3308        error = do_cancel(r, lkb);
3309        send_cancel_reply(r, lkb, error);
3310 out:
3311        unlock_rsb(r);
3312        put_rsb(r);
3313        dlm_put_lkb(lkb);
3314        return;
3315
3316 fail:
3317        setup_stub_lkb(ls, ms);
3318        send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3319}
3320
3321static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3322{
3323        struct dlm_lkb *lkb;
3324        struct dlm_rsb *r;
3325        int error;
3326
3327        error = find_lkb(ls, ms->m_remid, &lkb);
3328        if (error) {
3329                log_debug(ls, "receive_grant from %d no lkb %x",
3330                          ms->m_header.h_nodeid, ms->m_remid);
3331                return;
3332        }
3333
3334        r = lkb->lkb_resource;
3335
3336        hold_rsb(r);
3337        lock_rsb(r);
3338
3339        error = validate_message(lkb, ms);
3340        if (error)
3341                goto out;
3342
3343        receive_flags_reply(lkb, ms);
3344        if (is_altmode(lkb))
3345                munge_altmode(lkb, ms);
3346        grant_lock_pc(r, lkb, ms);
3347        queue_cast(r, lkb, 0);
3348 out:
3349        unlock_rsb(r);
3350        put_rsb(r);
3351        dlm_put_lkb(lkb);
3352}
3353
3354static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3355{
3356        struct dlm_lkb *lkb;
3357        struct dlm_rsb *r;
3358        int error;
3359
3360        error = find_lkb(ls, ms->m_remid, &lkb);
3361        if (error) {
3362                log_debug(ls, "receive_bast from %d no lkb %x",
3363                          ms->m_header.h_nodeid, ms->m_remid);
3364                return;
3365        }
3366
3367        r = lkb->lkb_resource;
3368
3369        hold_rsb(r);
3370        lock_rsb(r);
3371
3372        error = validate_message(lkb, ms);
3373        if (error)
3374                goto out;
3375
3376        queue_bast(r, lkb, ms->m_bastmode);
3377 out:
3378        unlock_rsb(r);
3379        put_rsb(r);
3380        dlm_put_lkb(lkb);
3381}
3382
3383static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3384{
3385        int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3386
3387        from_nodeid = ms->m_header.h_nodeid;
3388        our_nodeid = dlm_our_nodeid();
3389
3390        len = receive_extralen(ms);
3391
3392        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3393        if (dir_nodeid != our_nodeid) {
3394                log_error(ls, "lookup dir_nodeid %d from %d",
3395                          dir_nodeid, from_nodeid);
3396                error = -EINVAL;
3397                ret_nodeid = -1;
3398                goto out;
3399        }
3400
3401        error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3402
3403        /* Optimization: we're master so treat lookup as a request */
3404        if (!error && ret_nodeid == our_nodeid) {
3405                receive_request(ls, ms);
3406                return;
3407        }
3408 out:
3409        send_lookup_reply(ls, ms, ret_nodeid, error);
3410}
3411
3412static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3413{
3414        int len, dir_nodeid, from_nodeid;
3415
3416        from_nodeid = ms->m_header.h_nodeid;
3417
3418        len = receive_extralen(ms);
3419
3420        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3421        if (dir_nodeid != dlm_our_nodeid()) {
3422                log_error(ls, "remove dir entry dir_nodeid %d from %d",
3423                          dir_nodeid, from_nodeid);
3424                return;
3425        }
3426
3427        dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3428}
3429
3430static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3431{
3432        do_purge(ls, ms->m_nodeid, ms->m_pid);
3433}
3434
3435static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3436{
3437        struct dlm_lkb *lkb;
3438        struct dlm_rsb *r;
3439        int error, mstype, result;
3440
3441        error = find_lkb(ls, ms->m_remid, &lkb);
3442        if (error) {
3443                log_debug(ls, "receive_request_reply from %d no lkb %x",
3444                          ms->m_header.h_nodeid, ms->m_remid);
3445                return;
3446        }
3447
3448        r = lkb->lkb_resource;
3449        hold_rsb(r);
3450        lock_rsb(r);
3451
3452        error = validate_message(lkb, ms);
3453        if (error)
3454                goto out;
3455
3456        mstype = lkb->lkb_wait_type;
3457        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3458        if (error)
3459                goto out;
3460
3461        /* Optimization: the dir node was also the master, so it took our
3462           lookup as a request and sent request reply instead of lookup reply */
3463        if (mstype == DLM_MSG_LOOKUP) {
3464                r->res_nodeid = ms->m_header.h_nodeid;
3465                lkb->lkb_nodeid = r->res_nodeid;
3466        }
3467
3468        /* this is the value returned from do_request() on the master */
3469        result = ms->m_result;
3470
3471        switch (result) {
3472        case -EAGAIN:
3473                /* request would block (be queued) on remote master */
3474                queue_cast(r, lkb, -EAGAIN);
3475                confirm_master(r, -EAGAIN);
3476                unhold_lkb(lkb); /* undoes create_lkb() */
3477                break;
3478
3479        case -EINPROGRESS:
3480        case 0:
3481                /* request was queued or granted on remote master */
3482                receive_flags_reply(lkb, ms);
3483                lkb->lkb_remid = ms->m_lkid;
3484                if (is_altmode(lkb))
3485                        munge_altmode(lkb, ms);
3486                if (result) {
3487                        add_lkb(r, lkb, DLM_LKSTS_WAITING);
3488                        add_timeout(lkb);
3489                } else {
3490                        grant_lock_pc(r, lkb, ms);
3491                        queue_cast(r, lkb, 0);
3492                }
3493                confirm_master(r, result);
3494                break;
3495
3496        case -EBADR:
3497        case -ENOTBLK:
3498                /* find_rsb failed to find rsb or rsb wasn't master */
3499                log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3500                          lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3501                r->res_nodeid = -1;
3502                lkb->lkb_nodeid = -1;
3503
3504                if (is_overlap(lkb)) {
3505                        /* we'll ignore error in cancel/unlock reply */
3506                        queue_cast_overlap(r, lkb);
3507                        confirm_master(r, result);
3508                        unhold_lkb(lkb); /* undoes create_lkb() */
3509                } else
3510                        _request_lock(r, lkb);
3511                break;
3512
3513        default:
3514                log_error(ls, "receive_request_reply %x error %d",
3515                          lkb->lkb_id, result);
3516        }
3517
3518        if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3519                log_debug(ls, "receive_request_reply %x result %d unlock",
3520                          lkb->lkb_id, result);
3521                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3522                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3523                send_unlock(r, lkb);
3524        } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3525                log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3526                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3527                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3528                send_cancel(r, lkb);
3529        } else {
3530                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3531                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3532        }
3533 out:
3534        unlock_rsb(r);
3535        put_rsb(r);
3536        dlm_put_lkb(lkb);
3537}
3538
3539static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3540                                    struct dlm_message *ms)
3541{
3542        /* this is the value returned from do_convert() on the master */
3543        switch (ms->m_result) {
3544        case -EAGAIN:
3545                /* convert would block (be queued) on remote master */
3546                queue_cast(r, lkb, -EAGAIN);
3547                break;
3548
3549        case -EDEADLK:
3550                receive_flags_reply(lkb, ms);
3551                revert_lock_pc(r, lkb);
3552                queue_cast(r, lkb, -EDEADLK);
3553                break;
3554
3555        case -EINPROGRESS:
3556                /* convert was queued on remote master */
3557                receive_flags_reply(lkb, ms);
3558                if (is_demoted(lkb))
3559                        munge_demoted(lkb, ms);
3560                del_lkb(r, lkb);
3561                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3562                add_timeout(lkb);
3563                break;
3564
3565        case 0:
3566                /* convert was granted on remote master */
3567                receive_flags_reply(lkb, ms);
3568                if (is_demoted(lkb))
3569                        munge_demoted(lkb, ms);
3570                grant_lock_pc(r, lkb, ms);
3571                queue_cast(r, lkb, 0);
3572                break;
3573
3574        default:
3575                log_error(r->res_ls, "receive_convert_reply %x error %d",
3576                          lkb->lkb_id, ms->m_result);
3577        }
3578}
3579
3580static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3581{
3582        struct dlm_rsb *r = lkb->lkb_resource;
3583        int error;
3584
3585        hold_rsb(r);
3586        lock_rsb(r);
3587
3588        error = validate_message(lkb, ms);
3589        if (error)
3590                goto out;
3591
3592        /* stub reply can happen with waiters_mutex held */
3593        error = remove_from_waiters_ms(lkb, ms);
3594        if (error)
3595                goto out;
3596
3597        __receive_convert_reply(r, lkb, ms);
3598 out:
3599        unlock_rsb(r);
3600        put_rsb(r);
3601}
3602
3603static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3604{
3605        struct dlm_lkb *lkb;
3606        int error;
3607
3608        error = find_lkb(ls, ms->m_remid, &lkb);
3609        if (error) {
3610                log_debug(ls, "receive_convert_reply from %d no lkb %x",
3611                          ms->m_header.h_nodeid, ms->m_remid);
3612                return;
3613        }
3614
3615        _receive_convert_reply(lkb, ms);
3616        dlm_put_lkb(lkb);
3617}
3618
3619static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3620{
3621        struct dlm_rsb *r = lkb->lkb_resource;
3622        int error;
3623
3624        hold_rsb(r);
3625        lock_rsb(r);
3626
3627        error = validate_message(lkb, ms);
3628        if (error)
3629                goto out;
3630
3631        /* stub reply can happen with waiters_mutex held */
3632        error = remove_from_waiters_ms(lkb, ms);
3633        if (error)
3634                goto out;
3635
3636        /* this is the value returned from do_unlock() on the master */
3637
3638        switch (ms->m_result) {
3639        case -DLM_EUNLOCK:
3640                receive_flags_reply(lkb, ms);
3641                remove_lock_pc(r, lkb);
3642                queue_cast(r, lkb, -DLM_EUNLOCK);
3643                break;
3644        case -ENOENT:
3645                break;
3646        default:
3647                log_error(r->res_ls, "receive_unlock_reply %x error %d",
3648                          lkb->lkb_id, ms->m_result);
3649        }
3650 out:
3651        unlock_rsb(r);
3652        put_rsb(r);
3653}
3654
3655static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3656{
3657        struct dlm_lkb *lkb;
3658        int error;
3659
3660        error = find_lkb(ls, ms->m_remid, &lkb);
3661        if (error) {
3662                log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3663                          ms->m_header.h_nodeid, ms->m_remid);
3664                return;
3665        }
3666
3667        _receive_unlock_reply(lkb, ms);
3668        dlm_put_lkb(lkb);
3669}
3670
3671static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3672{
3673        struct dlm_rsb *r = lkb->lkb_resource;
3674        int error;
3675
3676        hold_rsb(r);
3677        lock_rsb(r);
3678
3679        error = validate_message(lkb, ms);
3680        if (error)
3681                goto out;
3682
3683        /* stub reply can happen with waiters_mutex held */
3684        error = remove_from_waiters_ms(lkb, ms);
3685        if (error)
3686                goto out;
3687
3688        /* this is the value returned from do_cancel() on the master */
3689
3690        switch (ms->m_result) {
3691        case -DLM_ECANCEL:
3692                receive_flags_reply(lkb, ms);
3693                revert_lock_pc(r, lkb);
3694                queue_cast(r, lkb, -DLM_ECANCEL);
3695                break;
3696        case 0:
3697                break;
3698        default:
3699                log_error(r->res_ls, "receive_cancel_reply %x error %d",
3700                          lkb->lkb_id, ms->m_result);
3701        }
3702 out:
3703        unlock_rsb(r);
3704        put_rsb(r);
3705}
3706
3707static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3708{
3709        struct dlm_lkb *lkb;
3710        int error;
3711
3712        error = find_lkb(ls, ms->m_remid, &lkb);
3713        if (error) {
3714                log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3715                          ms->m_header.h_nodeid, ms->m_remid);
3716                return;
3717        }
3718
3719        _receive_cancel_reply(lkb, ms);
3720        dlm_put_lkb(lkb);
3721}
3722
3723static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3724{
3725        struct dlm_lkb *lkb;
3726        struct dlm_rsb *r;
3727        int error, ret_nodeid;
3728
3729        error = find_lkb(ls, ms->m_lkid, &lkb);
3730        if (error) {
3731                log_error(ls, "receive_lookup_reply no lkb");
3732                return;
3733        }
3734
3735        /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3736           FIXME: will a non-zero error ever be returned? */
3737
3738        r = lkb->lkb_resource;
3739        hold_rsb(r);
3740        lock_rsb(r);
3741
3742        error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3743        if (error)
3744                goto out;
3745
3746        ret_nodeid = ms->m_nodeid;
3747        if (ret_nodeid == dlm_our_nodeid()) {
3748                r->res_nodeid = 0;
3749                ret_nodeid = 0;
3750                r->res_first_lkid = 0;
3751        } else {
3752                /* set_master() will copy res_nodeid to lkb_nodeid */
3753                r->res_nodeid = ret_nodeid;
3754        }
3755
3756        if (is_overlap(lkb)) {
3757                log_debug(ls, "receive_lookup_reply %x unlock %x",
3758                          lkb->lkb_id, lkb->lkb_flags);
3759                queue_cast_overlap(r, lkb);
3760                unhold_lkb(lkb); /* undoes create_lkb() */
3761                goto out_list;
3762        }
3763
3764        _request_lock(r, lkb);
3765
3766 out_list:
3767        if (!ret_nodeid)
3768                process_lookup_list(r);
3769 out:
3770        unlock_rsb(r);
3771        put_rsb(r);
3772        dlm_put_lkb(lkb);
3773}
3774
3775static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3776{
3777        if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3778                log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3779                          ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3780                          ms->m_remid, ms->m_result);
3781                return;
3782        }
3783
3784        switch (ms->m_type) {
3785
3786        /* messages sent to a master node */
3787
3788        case DLM_MSG_REQUEST:
3789                receive_request(ls, ms);
3790                break;
3791
3792        case DLM_MSG_CONVERT:
3793                receive_convert(ls, ms);
3794                break;
3795
3796        case DLM_MSG_UNLOCK:
3797                receive_unlock(ls, ms);
3798                break;
3799
3800        case DLM_MSG_CANCEL:
3801                receive_cancel(ls, ms);
3802                break;
3803
3804        /* messages sent from a master node (replies to above) */
3805
3806        case DLM_MSG_REQUEST_REPLY:
3807                receive_request_reply(ls, ms);
3808                break;
3809
3810        case DLM_MSG_CONVERT_REPLY:
3811                receive_convert_reply(ls, ms);
3812                break;
3813
3814        case DLM_MSG_UNLOCK_REPLY:
3815                receive_unlock_reply(ls, ms);
3816                break;
3817
3818        case DLM_MSG_CANCEL_REPLY:
3819                receive_cancel_reply(ls, ms);
3820                break;
3821
3822        /* messages sent from a master node (only two types of async msg) */
3823
3824        case DLM_MSG_GRANT:
3825                receive_grant(ls, ms);
3826                break;
3827
3828        case DLM_MSG_BAST:
3829                receive_bast(ls, ms);
3830                break;
3831
3832        /* messages sent to a dir node */
3833
3834        case DLM_MSG_LOOKUP:
3835                receive_lookup(ls, ms);
3836                break;
3837
3838        case DLM_MSG_REMOVE:
3839                receive_remove(ls, ms);
3840                break;
3841
3842        /* messages sent from a dir node (remove has no reply) */
3843
3844        case DLM_MSG_LOOKUP_REPLY:
3845                receive_lookup_reply(ls, ms);
3846                break;
3847
3848        /* other messages */
3849
3850        case DLM_MSG_PURGE:
3851                receive_purge(ls, ms);
3852                break;
3853
3854        default:
3855                log_error(ls, "unknown message type %d", ms->m_type);
3856        }
3857
3858        dlm_astd_wake();
3859}
3860
3861/* If the lockspace is in recovery mode (locking stopped), then normal
3862   messages are saved on the requestqueue for processing after recovery is
3863   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3864   messages off the requestqueue before we process new ones. This occurs right
3865   after recovery completes when we transition from saving all messages on
3866   requestqueue, to processing all the saved messages, to processing new
3867   messages as they arrive. */
3868
3869static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3870                                int nodeid)
3871{
3872        if (dlm_locking_stopped(ls)) {
3873                dlm_add_requestqueue(ls, nodeid, ms);
3874        } else {
3875                dlm_wait_requestqueue(ls);
3876                _receive_message(ls, ms);
3877        }
3878}
3879
3880/* This is called by dlm_recoverd to process messages that were saved on
3881   the requestqueue. */
3882
3883void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3884{
3885        _receive_message(ls, ms);
3886}
3887
3888/* This is called by the midcomms layer when something is received for
3889   the lockspace.  It could be either a MSG (normal message sent as part of
3890   standard locking activity) or an RCOM (recovery message sent as part of
3891   lockspace recovery). */
3892
3893void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3894{
3895        struct dlm_header *hd = &p->header;
3896        struct dlm_ls *ls;
3897        int type = 0;
3898
3899        switch (hd->h_cmd) {
3900        case DLM_MSG:
3901                dlm_message_in(&p->message);
3902                type = p->message.m_type;
3903                break;
3904        case DLM_RCOM:
3905                dlm_rcom_in(&p->rcom);
3906                type = p->rcom.rc_type;
3907                break;
3908        default:
3909                log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3910                return;
3911        }
3912
3913        if (hd->h_nodeid != nodeid) {
3914                log_print("invalid h_nodeid %d from %d lockspace %x",
3915                          hd->h_nodeid, nodeid, hd->h_lockspace);
3916                return;
3917        }
3918
3919        ls = dlm_find_lockspace_global(hd->h_lockspace);
3920        if (!ls) {
3921                if (dlm_config.ci_log_debug)
3922                        log_print("invalid lockspace %x from %d cmd %d type %d",
3923                                  hd->h_lockspace, nodeid, hd->h_cmd, type);
3924
3925                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3926                        dlm_send_ls_not_ready(nodeid, &p->rcom);
3927                return;
3928        }
3929
3930        /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3931           be inactive (in this ls) before transitioning to recovery mode */
3932
3933        down_read(&ls->ls_recv_active);
3934        if (hd->h_cmd == DLM_MSG)
3935                dlm_receive_message(ls, &p->message, nodeid);
3936        else
3937                dlm_receive_rcom(ls, &p->rcom, nodeid);
3938        up_read(&ls->ls_recv_active);
3939
3940        dlm_put_lockspace(ls);
3941}
3942
3943static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3944{
3945        if (middle_conversion(lkb)) {
3946                hold_lkb(lkb);
3947                ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3948                ls->ls_stub_ms.m_result = -EINPROGRESS;
3949                ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3950                ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3951                _receive_convert_reply(lkb, &ls->ls_stub_ms);
3952
3953                /* Same special case as in receive_rcom_lock_args() */
3954                lkb->lkb_grmode = DLM_LOCK_IV;
3955                rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3956                unhold_lkb(lkb);
3957
3958        } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3959                lkb->lkb_flags |= DLM_IFL_RESEND;
3960        }
3961
3962        /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3963           conversions are async; there's no reply from the remote master */
3964}
3965
3966/* A waiting lkb needs recovery if the master node has failed, or
3967   the master node is changing (only when no directory is used) */
3968
3969static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3970{
3971        if (dlm_is_removed(ls, lkb->lkb_nodeid))
3972                return 1;
3973
3974        if (!dlm_no_directory(ls))
3975                return 0;
3976
3977        if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3978                return 1;
3979
3980        return 0;
3981}
3982
3983/* Recovery for locks that are waiting for replies from nodes that are now
3984   gone.  We can just complete unlocks and cancels by faking a reply from the
3985   dead node.  Requests and up-conversions we flag to be resent after
3986   recovery.  Down-conversions can just be completed with a fake reply like
3987   unlocks.  Conversions between PR and CW need special attention. */
3988
3989void dlm_recover_waiters_pre(struct dlm_ls *ls)
3990{
3991        struct dlm_lkb *lkb, *safe;
3992        int wait_type, stub_unlock_result, stub_cancel_result;
3993
3994        mutex_lock(&ls->ls_waiters_mutex);
3995
3996        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3997                log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3998                          lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3999
4000                /* all outstanding lookups, regardless of destination  will be
4001                   resent after recovery is done */
4002
4003                if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4004                        lkb->lkb_flags |= DLM_IFL_RESEND;
4005                        continue;
4006                }
4007
4008                if (!waiter_needs_recovery(ls, lkb))
4009                        continue;
4010
4011                wait_type = lkb->lkb_wait_type;
4012                stub_unlock_result = -DLM_EUNLOCK;
4013                stub_cancel_result = -DLM_ECANCEL;
4014
4015                /* Main reply may have been received leaving a zero wait_type,
4016                   but a reply for the overlapping op may not have been
4017                   received.  In that case we need to fake the appropriate
4018                   reply for the overlap op. */
4019
4020                if (!wait_type) {
4021                        if (is_overlap_cancel(lkb)) {
4022                                wait_type = DLM_MSG_CANCEL;
4023                                if (lkb->lkb_grmode == DLM_LOCK_IV)
4024                                        stub_cancel_result = 0;
4025                        }
4026                        if (is_overlap_unlock(lkb)) {
4027                                wait_type = DLM_MSG_UNLOCK;
4028                                if (lkb->lkb_grmode == DLM_LOCK_IV)
4029                                        stub_unlock_result = -ENOENT;
4030                        }
4031
4032                        log_debug(ls, "rwpre overlap %x %x %d %d %d",
4033                                  lkb->lkb_id, lkb->lkb_flags, wait_type,
4034                                  stub_cancel_result, stub_unlock_result);
4035                }
4036
4037                switch (wait_type) {
4038
4039                case DLM_MSG_REQUEST:
4040                        lkb->lkb_flags |= DLM_IFL_RESEND;
4041                        break;
4042
4043                case DLM_MSG_CONVERT:
4044                        recover_convert_waiter(ls, lkb);
4045                        break;
4046
4047                case DLM_MSG_UNLOCK:
4048                        hold_lkb(lkb);
4049                        ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4050                        ls->ls_stub_ms.m_result = stub_unlock_result;
4051                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4052                        ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4053                        _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4054                        dlm_put_lkb(lkb);
4055                        break;
4056
4057                case DLM_MSG_CANCEL:
4058                        hold_lkb(lkb);
4059                        ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4060                        ls->ls_stub_ms.m_result = stub_cancel_result;
4061                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4062                        ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4063                        _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4064                        dlm_put_lkb(lkb);
4065                        break;
4066
4067                default:
4068                        log_error(ls, "invalid lkb wait_type %d %d",
4069                                  lkb->lkb_wait_type, wait_type);
4070                }
4071                schedule();
4072        }
4073        mutex_unlock(&ls->ls_waiters_mutex);
4074}
4075
4076static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4077{
4078        struct dlm_lkb *lkb;
4079        int found = 0;
4080
4081        mutex_lock(&ls->ls_waiters_mutex);
4082        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4083                if (lkb->lkb_flags & DLM_IFL_RESEND) {
4084                        hold_lkb(lkb);
4085                        found = 1;
4086                        break;
4087                }
4088        }
4089        mutex_unlock(&ls->ls_waiters_mutex);
4090
4091        if (!found)
4092                lkb = NULL;
4093        return lkb;
4094}
4095
4096/* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4097   master or dir-node for r.  Processing the lkb may result in it being placed
4098   back on waiters. */
4099
4100/* We do this after normal locking has been enabled and any saved messages
4101   (in requestqueue) have been processed.  We should be confident that at
4102   this point we won't get or process a reply to any of these waiting
4103   operations.  But, new ops may be coming in on the rsbs/locks here from
4104   userspace or remotely. */
4105
4106/* there may have been an overlap unlock/cancel prior to recovery or after
4107   recovery.  if before, the lkb may still have a pos wait_count; if after, the
4108   overlap flag would just have been set and nothing new sent.  we can be
4109   confident here than any replies to either the initial op or overlap ops
4110   prior to recovery have been received. */
4111
4112int dlm_recover_waiters_post(struct dlm_ls *ls)
4113{
4114        struct dlm_lkb *lkb;
4115        struct dlm_rsb *r;
4116        int error = 0, mstype, err, oc, ou;
4117
4118        while (1) {
4119                if (dlm_locking_stopped(ls)) {
4120                        log_debug(ls, "recover_waiters_post aborted");
4121                        error = -EINTR;
4122                        break;
4123                }
4124
4125                lkb = find_resend_waiter(ls);
4126                if (!lkb)
4127                        break;
4128
4129                r = lkb->lkb_resource;
4130                hold_rsb(r);
4131                lock_rsb(r);
4132
4133                mstype = lkb->lkb_wait_type;
4134                oc = is_overlap_cancel(lkb);
4135                ou = is_overlap_unlock(lkb);
4136                err = 0;
4137
4138                log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4139                          lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4140
4141                /* At this point we assume that we won't get a reply to any
4142                   previous op or overlap op on this lock.  First, do a big
4143                   remove_from_waiters() for all previous ops. */
4144
4145                lkb->lkb_flags &= ~DLM_IFL_RESEND;
4146                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4147                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4148                lkb->lkb_wait_type = 0;
4149                lkb->lkb_wait_count = 0;
4150                mutex_lock(&ls->ls_waiters_mutex);
4151                list_del_init(&lkb->lkb_wait_reply);
4152                mutex_unlock(&ls->ls_waiters_mutex);
4153                unhold_lkb(lkb); /* for waiters list */
4154
4155                if (oc || ou) {
4156                        /* do an unlock or cancel instead of resending */
4157                        switch (mstype) {
4158                        case DLM_MSG_LOOKUP:
4159                        case DLM_MSG_REQUEST:
4160                                queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4161                                                        -DLM_ECANCEL);
4162                                unhold_lkb(lkb); /* undoes create_lkb() */
4163                                break;
4164                        case DLM_MSG_CONVERT:
4165                                if (oc) {
4166                                        queue_cast(r, lkb, -DLM_ECANCEL);
4167                                } else {
4168                                        lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4169                                        _unlock_lock(r, lkb);
4170                                }
4171                                break;
4172                        default:
4173                                err = 1;
4174                        }
4175                } else {
4176                        switch (mstype) {
4177                        case DLM_MSG_LOOKUP:
4178                        case DLM_MSG_REQUEST:
4179                                _request_lock(r, lkb);
4180                                if (is_master(r))
4181                                        confirm_master(r, 0);
4182                                break;
4183                        case DLM_MSG_CONVERT:
4184                                _convert_lock(r, lkb);
4185                                break;
4186                        default:
4187                                err = 1;
4188                        }
4189                }
4190
4191                if (err)
4192                        log_error(ls, "recover_waiters_post %x %d %x %d %d",
4193                                  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4194                unlock_rsb(r);
4195                put_rsb(r);
4196                dlm_put_lkb(lkb);
4197        }
4198
4199        return error;
4200}
4201
4202static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4203                        int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4204{
4205        struct dlm_ls *ls = r->res_ls;
4206        struct dlm_lkb *lkb, *safe;
4207
4208        list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4209                if (test(ls, lkb)) {
4210                        rsb_set_flag(r, RSB_LOCKS_PURGED);
4211                        del_lkb(r, lkb);
4212                        /* this put should free the lkb */
4213                        if (!dlm_put_lkb(lkb))
4214                                log_error(ls, "purged lkb not released");
4215                }
4216        }
4217}
4218
4219static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4220{
4221        return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4222}
4223
4224static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4225{
4226        return is_master_copy(lkb);
4227}
4228
4229static void purge_dead_locks(struct dlm_rsb *r)
4230{
4231        purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4232        purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4233        purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4234}
4235
4236void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4237{
4238        purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4239        purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4240        purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4241}
4242
4243/* Get rid of locks held by nodes that are gone. */
4244
4245int dlm_purge_locks(struct dlm_ls *ls)
4246{
4247        struct dlm_rsb *r;
4248
4249        log_debug(ls, "dlm_purge_locks");
4250
4251        down_write(&ls->ls_root_sem);
4252        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4253                hold_rsb(r);
4254                lock_rsb(r);
4255                if (is_master(r))
4256                        purge_dead_locks(r);
4257                unlock_rsb(r);
4258                unhold_rsb(r);
4259
4260                schedule();
4261        }
4262        up_write(&ls->ls_root_sem);
4263
4264        return 0;
4265}
4266
4267static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4268{
4269        struct dlm_rsb *r, *r_ret = NULL;
4270
4271        spin_lock(&ls->ls_rsbtbl[bucket].lock);
4272        list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4273                if (!rsb_flag(r, RSB_LOCKS_PURGED))
4274                        continue;
4275                hold_rsb(r);
4276                rsb_clear_flag(r, RSB_LOCKS_PURGED);
4277                r_ret = r;
4278                break;
4279        }
4280        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4281        return r_ret;
4282}
4283
4284void dlm_grant_after_purge(struct dlm_ls *ls)
4285{
4286        struct dlm_rsb *r;
4287        int bucket = 0;
4288
4289        while (1) {
4290                r = find_purged_rsb(ls, bucket);
4291                if (!r) {
4292                        if (bucket == ls->ls_rsbtbl_size - 1)
4293                                break;
4294                        bucket++;
4295                        continue;
4296                }
4297                lock_rsb(r);
4298                if (is_master(r)) {
4299                        grant_pending_locks(r);
4300                        confirm_master(r, 0);
4301                }
4302                unlock_rsb(r);
4303                put_rsb(r);
4304                schedule();
4305        }
4306}
4307
4308static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4309                                         uint32_t remid)
4310{
4311        struct dlm_lkb *lkb;
4312
4313        list_for_each_entry(lkb, head, lkb_statequeue) {
4314                if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4315                        return lkb;
4316        }
4317        return NULL;
4318}
4319
4320static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4321                                    uint32_t remid)
4322{
4323        struct dlm_lkb *lkb;
4324
4325        lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4326        if (lkb)
4327                return lkb;
4328        lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4329        if (lkb)
4330                return lkb;
4331        lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4332        if (lkb)
4333                return lkb;
4334        return NULL;
4335}
4336
4337/* needs at least dlm_rcom + rcom_lock */
4338static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4339                                  struct dlm_rsb *r, struct dlm_rcom *rc)
4340{
4341        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4342
4343        lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4344        lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4345        lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4346        lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4347        lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4348        lkb->lkb_flags |= DLM_IFL_MSTCPY;
4349        lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4350        lkb->lkb_rqmode = rl->rl_rqmode;
4351        lkb->lkb_grmode = rl->rl_grmode;
4352        /* don't set lkb_status because add_lkb wants to itself */
4353
4354        lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4355        lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4356
4357        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4358                int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4359                         sizeof(struct rcom_lock);
4360                if (lvblen > ls->ls_lvblen)
4361                        return -EINVAL;
4362                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4363                if (!lkb->lkb_lvbptr)
4364                        return -ENOMEM;
4365                memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4366        }
4367
4368        /* Conversions between PR and CW (middle modes) need special handling.
4369           The real granted mode of these converting locks cannot be determined
4370           until all locks have been rebuilt on the rsb (recover_conversion) */
4371
4372        if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4373            middle_conversion(lkb)) {
4374                rl->rl_status = DLM_LKSTS_CONVERT;
4375                lkb->lkb_grmode = DLM_LOCK_IV;
4376                rsb_set_flag(r, RSB_RECOVER_CONVERT);
4377        }
4378
4379        return 0;
4380}
4381
4382/* This lkb may have been recovered in a previous aborted recovery so we need
4383   to check if the rsb already has an lkb with the given remote nodeid/lkid.
4384   If so we just send back a standard reply.  If not, we create a new lkb with
4385   the given values and send back our lkid.  We send back our lkid by sending
4386   back the rcom_lock struct we got but with the remid field filled in. */
4387
4388/* needs at least dlm_rcom + rcom_lock */
4389int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4390{
4391        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4392        struct dlm_rsb *r;
4393        struct dlm_lkb *lkb;
4394        int error;
4395
4396        if (rl->rl_parent_lkid) {
4397                error = -EOPNOTSUPP;
4398                goto out;
4399        }
4400
4401        error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4402                         R_MASTER, &r);
4403        if (error)
4404                goto out;
4405
4406        lock_rsb(r);
4407
4408        lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4409        if (lkb) {
4410                error = -EEXIST;
4411                goto out_remid;
4412        }
4413
4414        error = create_lkb(ls, &lkb);
4415        if (error)
4416                goto out_unlock;
4417
4418        error = receive_rcom_lock_args(ls, lkb, r, rc);
4419        if (error) {
4420                __put_lkb(ls, lkb);
4421                goto out_unlock;
4422        }
4423
4424        attach_lkb(r, lkb);
4425        add_lkb(r, lkb, rl->rl_status);
4426        error = 0;
4427
4428 out_remid:
4429        /* this is the new value returned to the lock holder for
4430           saving in its process-copy lkb */
4431        rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4432
4433 out_unlock:
4434        unlock_rsb(r);
4435        put_rsb(r);
4436 out:
4437        if (error)
4438                log_debug(ls, "recover_master_copy %d %x", error,
4439                          le32_to_cpu(rl->rl_lkid));
4440        rl->rl_result = cpu_to_le32(error);
4441        return error;
4442}
4443
4444/* needs at least dlm_rcom + rcom_lock */
4445int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4446{
4447        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4448        struct dlm_rsb *r;
4449        struct dlm_lkb *lkb;
4450        int error;
4451
4452        error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4453        if (error) {
4454                log_error(ls, "recover_process_copy no lkid %x",
4455                                le32_to_cpu(rl->rl_lkid));
4456                return error;
4457        }
4458
4459        DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4460
4461        error = le32_to_cpu(rl->rl_result);
4462
4463        r = lkb->lkb_resource;
4464        hold_rsb(r);
4465        lock_rsb(r);
4466
4467        switch (error) {
4468        case -EBADR:
4469                /* There's a chance the new master received our lock before
4470                   dlm_recover_master_reply(), this wouldn't happen if we did
4471                   a barrier between recover_masters and recover_locks. */
4472                log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4473                          (unsigned long)r, r->res_name);
4474                dlm_send_rcom_lock(r, lkb);
4475                goto out;
4476        case -EEXIST:
4477                log_debug(ls, "master copy exists %x", lkb->lkb_id);
4478                /* fall through */
4479        case 0:
4480                lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4481                break;
4482        default:
4483                log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4484                          error, lkb->lkb_id);
4485        }
4486
4487        /* an ack for dlm_recover_locks() which waits for replies from
4488           all the locks it sends to new masters */
4489        dlm_recovered_lock(r);
4490 out:
4491        unlock_rsb(r);
4492        put_rsb(r);
4493        dlm_put_lkb(lkb);
4494
4495        return 0;
4496}
4497
4498int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4499                     int mode, uint32_t flags, void *name, unsigned int namelen,
4500                     unsigned long timeout_cs)
4501{
4502        struct dlm_lkb *lkb;
4503        struct dlm_args args;
4504        int error;
4505
4506        dlm_lock_recovery(ls);
4507
4508        error = create_lkb(ls, &lkb);
4509        if (error) {
4510                kfree(ua);
4511                goto out;
4512        }
4513
4514        if (flags & DLM_LKF_VALBLK) {
4515                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4516                if (!ua->lksb.sb_lvbptr) {
4517                        kfree(ua);
4518                        __put_lkb(ls, lkb);
4519                        error = -ENOMEM;
4520                        goto out;
4521                }
4522        }
4523
4524        /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4525           When DLM_IFL_USER is set, the dlm knows that this is a userspace
4526           lock and that lkb_astparam is the dlm_user_args structure. */
4527
4528        error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4529                              fake_astfn, ua, fake_bastfn, &args);
4530        lkb->lkb_flags |= DLM_IFL_USER;
4531        ua->old_mode = DLM_LOCK_IV;
4532
4533        if (error) {
4534                __put_lkb(ls, lkb);
4535                goto out;
4536        }
4537
4538        error = request_lock(ls, lkb, name, namelen, &args);
4539
4540        switch (error) {
4541        case 0:
4542                break;
4543        case -EINPROGRESS:
4544                error = 0;
4545                break;
4546        case -EAGAIN:
4547                error = 0;
4548                /* fall through */
4549        default:
4550                __put_lkb(ls, lkb);
4551                goto out;
4552        }
4553
4554        /* add this new lkb to the per-process list of locks */
4555        spin_lock(&ua->proc->locks_spin);
4556        hold_lkb(lkb);
4557        list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4558        spin_unlock(&ua->proc->locks_spin);
4559 out:
4560        dlm_unlock_recovery(ls);
4561        return error;
4562}
4563
4564int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4565                     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4566                     unsigned long timeout_cs)
4567{
4568        struct dlm_lkb *lkb;
4569        struct dlm_args args;
4570        struct dlm_user_args *ua;
4571        int error;
4572
4573        dlm_lock_recovery(ls);
4574
4575        error = find_lkb(ls, lkid, &lkb);
4576        if (error)
4577                goto out;
4578
4579        /* user can change the params on its lock when it converts it, or
4580           add an lvb that didn't exist before */
4581
4582        ua = lkb->lkb_ua;
4583
4584        if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4585                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4586                if (!ua->lksb.sb_lvbptr) {
4587                        error = -ENOMEM;
4588                        goto out_put;
4589                }
4590        }
4591        if (lvb_in && ua->lksb.sb_lvbptr)
4592                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4593
4594        ua->xid = ua_tmp->xid;
4595        ua->castparam = ua_tmp->castparam;
4596        ua->castaddr = ua_tmp->castaddr;
4597        ua->bastparam = ua_tmp->bastparam;
4598        ua->bastaddr = ua_tmp->bastaddr;
4599        ua->user_lksb = ua_tmp->user_lksb;
4600        ua->old_mode = lkb->lkb_grmode;
4601
4602        error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4603                              fake_astfn, ua, fake_bastfn, &args);
4604        if (error)
4605                goto out_put;
4606
4607        error = convert_lock(ls, lkb, &args);
4608
4609        if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4610                error = 0;
4611 out_put:
4612        dlm_put_lkb(lkb);
4613 out:
4614        dlm_unlock_recovery(ls);
4615        kfree(ua_tmp);
4616        return error;
4617}
4618
4619int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4620                    uint32_t flags, uint32_t lkid, char *lvb_in)
4621{
4622        struct dlm_lkb *lkb;
4623        struct dlm_args args;
4624        struct dlm_user_args *ua;
4625        int error;
4626
4627        dlm_lock_recovery(ls);
4628
4629        error = find_lkb(ls, lkid, &lkb);
4630        if (error)
4631                goto out;
4632
4633        ua = lkb->lkb_ua;
4634
4635        if (lvb_in && ua->lksb.sb_lvbptr)
4636                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4637        if (ua_tmp->castparam)
4638                ua->castparam = ua_tmp->castparam;
4639        ua->user_lksb = ua_tmp->user_lksb;
4640
4641        error = set_unlock_args(flags, ua, &args);
4642        if (error)
4643                goto out_put;
4644
4645        error = unlock_lock(ls, lkb, &args);
4646
4647        if (error == -DLM_EUNLOCK)
4648                error = 0;
4649        /* from validate_unlock_args() */
4650        if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4651                error = 0;
4652        if (error)
4653                goto out_put;
4654
4655        spin_lock(&ua->proc->locks_spin);
4656        /* dlm_user_add_ast() may have already taken lkb off the proc list */
4657        if (!list_empty(&lkb->lkb_ownqueue))
4658                list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4659        spin_unlock(&ua->proc->locks_spin);
4660 out_put:
4661        dlm_put_lkb(lkb);
4662 out:
4663        dlm_unlock_recovery(ls);
4664        kfree(ua_tmp);
4665        return error;
4666}
4667
4668int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4669                    uint32_t flags, uint32_t lkid)
4670{
4671        struct dlm_lkb *lkb;
4672        struct dlm_args args;
4673        struct dlm_user_args *ua;
4674        int error;
4675
4676        dlm_lock_recovery(ls);
4677
4678        error = find_lkb(ls, lkid, &lkb);
4679        if (error)
4680                goto out;
4681
4682        ua = lkb->lkb_ua;
4683        if (ua_tmp->castparam)
4684                ua->castparam = ua_tmp->castparam;
4685        ua->user_lksb = ua_tmp->user_lksb;
4686
4687        error = set_unlock_args(flags, ua, &args);
4688        if (error)
4689                goto out_put;
4690
4691        error = cancel_lock(ls, lkb, &args);
4692
4693        if (error == -DLM_ECANCEL)
4694                error = 0;
4695        /* from validate_unlock_args() */
4696        if (error == -EBUSY)
4697                error = 0;
4698 out_put:
4699        dlm_put_lkb(lkb);
4700 out:
4701        dlm_unlock_recovery(ls);
4702        kfree(ua_tmp);
4703        return error;
4704}
4705
4706int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4707{
4708        struct dlm_lkb *lkb;
4709        struct dlm_args args;
4710        struct dlm_user_args *ua;
4711        struct dlm_rsb *r;
4712        int error;
4713
4714        dlm_lock_recovery(ls);
4715
4716        error = find_lkb(ls, lkid, &lkb);
4717        if (error)
4718                goto out;
4719
4720        ua = lkb->lkb_ua;
4721
4722        error = set_unlock_args(flags, ua, &args);
4723        if (error)
4724                goto out_put;
4725
4726        /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4727
4728        r = lkb->lkb_resource;
4729        hold_rsb(r);
4730        lock_rsb(r);
4731
4732        error = validate_unlock_args(lkb, &args);
4733        if (error)
4734                goto out_r;
4735        lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4736
4737        error = _cancel_lock(r, lkb);
4738 out_r:
4739        unlock_rsb(r);
4740        put_rsb(r);
4741
4742        if (error == -DLM_ECANCEL)
4743                error = 0;
4744        /* from validate_unlock_args() */
4745        if (error == -EBUSY)
4746                error = 0;
4747 out_put:
4748        dlm_put_lkb(lkb);
4749 out:
4750        dlm_unlock_recovery(ls);
4751        return error;
4752}
4753
4754/* lkb's that are removed from the waiters list by revert are just left on the
4755   orphans list with the granted orphan locks, to be freed by purge */
4756
4757static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4758{
4759        struct dlm_args args;
4760        int error;
4761
4762        hold_lkb(lkb);
4763        mutex_lock(&ls->ls_orphans_mutex);
4764        list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4765        mutex_unlock(&ls->ls_orphans_mutex);
4766
4767        set_unlock_args(0, lkb->lkb_ua, &args);
4768
4769        error = cancel_lock(ls, lkb, &args);
4770        if (error == -DLM_ECANCEL)
4771                error = 0;
4772        return error;
4773}
4774
4775/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4776   Regardless of what rsb queue the lock is on, it's removed and freed. */
4777
4778static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4779{
4780        struct dlm_args args;
4781        int error;
4782
4783        set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4784
4785        error = unlock_lock(ls, lkb, &args);
4786        if (error == -DLM_EUNLOCK)
4787                error = 0;
4788        return error;
4789}
4790
4791/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4792   (which does lock_rsb) due to deadlock with receiving a message that does
4793   lock_rsb followed by dlm_user_add_ast() */
4794
4795static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4796                                     struct dlm_user_proc *proc)
4797{
4798        struct dlm_lkb *lkb = NULL;
4799
4800        mutex_lock(&ls->ls_clear_proc_locks);
4801        if (list_empty(&proc->locks))
4802                goto out;
4803
4804        lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4805        list_del_init(&lkb->lkb_ownqueue);
4806
4807        if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4808                lkb->lkb_flags |= DLM_IFL_ORPHAN;
4809        else
4810                lkb->lkb_flags |= DLM_IFL_DEAD;
4811 out:
4812        mutex_unlock(&ls->ls_clear_proc_locks);
4813        return lkb;
4814}
4815
4816/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4817   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4818   which we clear here. */
4819
4820/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4821   list, and no more device_writes should add lkb's to proc->locks list; so we
4822   shouldn't need to take asts_spin or locks_spin here.  this assumes that
4823   device reads/writes/closes are serialized -- FIXME: we may need to serialize
4824   them ourself. */
4825
4826void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4827{
4828        struct dlm_lkb *lkb, *safe;
4829
4830        dlm_lock_recovery(ls);
4831
4832        while (1) {
4833                lkb = del_proc_lock(ls, proc);
4834                if (!lkb)
4835                        break;
4836                del_timeout(lkb);
4837                if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4838                        orphan_proc_lock(ls, lkb);
4839                else
4840                        unlock_proc_lock(ls, lkb);
4841
4842                /* this removes the reference for the proc->locks list
4843                   added by dlm_user_request, it may result in the lkb
4844                   being freed */
4845
4846                dlm_put_lkb(lkb);
4847        }
4848
4849        mutex_lock(&ls->ls_clear_proc_locks);
4850
4851        /* in-progress unlocks */
4852        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4853                list_del_init(&lkb->lkb_ownqueue);
4854                lkb->lkb_flags |= DLM_IFL_DEAD;
4855                dlm_put_lkb(lkb);
4856        }
4857
4858        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4859                lkb->lkb_ast_type = 0;
4860                list_del(&lkb->lkb_astqueue);
4861                dlm_put_lkb(lkb);
4862        }
4863
4864        mutex_unlock(&ls->ls_clear_proc_locks);
4865        dlm_unlock_recovery(ls);
4866}
4867
4868static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4869{
4870        struct dlm_lkb *lkb, *safe;
4871
4872        while (1) {
4873                lkb = NULL;
4874                spin_lock(&proc->locks_spin);
4875                if (!list_empty(&proc->locks)) {
4876                        lkb = list_entry(proc->locks.next, struct dlm_lkb,
4877                                         lkb_ownqueue);
4878                        list_del_init(&lkb->lkb_ownqueue);
4879                }
4880                spin_unlock(&proc->locks_spin);
4881
4882                if (!lkb)
4883                        break;
4884
4885                lkb->lkb_flags |= DLM_IFL_DEAD;
4886                unlock_proc_lock(ls, lkb);
4887                dlm_put_lkb(lkb); /* ref from proc->locks list */
4888        }
4889
4890        spin_lock(&proc->locks_spin);
4891        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4892                list_del_init(&lkb->lkb_ownqueue);
4893                lkb->lkb_flags |= DLM_IFL_DEAD;
4894                dlm_put_lkb(lkb);
4895        }
4896        spin_unlock(&proc->locks_spin);
4897
4898        spin_lock(&proc->asts_spin);
4899        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4900                list_del(&lkb->lkb_astqueue);
4901                dlm_put_lkb(lkb);
4902        }
4903        spin_unlock(&proc->asts_spin);
4904}
4905
4906/* pid of 0 means purge all orphans */
4907
4908static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4909{
4910        struct dlm_lkb *lkb, *safe;
4911
4912        mutex_lock(&ls->ls_orphans_mutex);
4913        list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4914                if (pid && lkb->lkb_ownpid != pid)
4915                        continue;
4916                unlock_proc_lock(ls, lkb);
4917                list_del_init(&lkb->lkb_ownqueue);
4918                dlm_put_lkb(lkb);
4919        }
4920        mutex_unlock(&ls->ls_orphans_mutex);
4921}
4922
4923static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4924{
4925        struct dlm_message *ms;
4926        struct dlm_mhandle *mh;
4927        int error;
4928
4929        error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4930                                DLM_MSG_PURGE, &ms, &mh);
4931        if (error)
4932                return error;
4933        ms->m_nodeid = nodeid;
4934        ms->m_pid = pid;
4935
4936        return send_message(mh, ms);
4937}
4938
4939int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4940                   int nodeid, int pid)
4941{
4942        int error = 0;
4943
4944        if (nodeid != dlm_our_nodeid()) {
4945                error = send_purge(ls, nodeid, pid);
4946        } else {
4947                dlm_lock_recovery(ls);
4948                if (pid == current->pid)
4949                        purge_proc_locks(ls, proc);
4950                else
4951                        do_purge(ls, nodeid, pid);
4952                dlm_unlock_recovery(ls);
4953        }
4954        return error;
4955}
4956
4957
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.