linux/fs/dlm/lock.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13/* Central locking logic has four stages:
  14
  15   dlm_lock()
  16   dlm_unlock()
  17
  18   request_lock(ls, lkb)
  19   convert_lock(ls, lkb)
  20   unlock_lock(ls, lkb)
  21   cancel_lock(ls, lkb)
  22
  23   _request_lock(r, lkb)
  24   _convert_lock(r, lkb)
  25   _unlock_lock(r, lkb)
  26   _cancel_lock(r, lkb)
  27
  28   do_request(r, lkb)
  29   do_convert(r, lkb)
  30   do_unlock(r, lkb)
  31   do_cancel(r, lkb)
  32
  33   Stage 1 (lock, unlock) is mainly about checking input args and
  34   splitting into one of the four main operations:
  35
  36       dlm_lock          = request_lock
  37       dlm_lock+CONVERT  = convert_lock
  38       dlm_unlock        = unlock_lock
  39       dlm_unlock+CANCEL = cancel_lock
  40
  41   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
  42   provided to the next stage.
  43
  44   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
  45   When remote, it calls send_xxxx(), when local it calls do_xxxx().
  46
  47   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
  48   given rsb and lkb and queues callbacks.
  49
  50   For remote operations, send_xxxx() results in the corresponding do_xxxx()
  51   function being executed on the remote node.  The connecting send/receive
  52   calls on local (L) and remote (R) nodes:
  53
  54   L: send_xxxx()              ->  R: receive_xxxx()
  55                                   R: do_xxxx()
  56   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
  57*/
  58#include <linux/types.h>
  59#include <linux/slab.h>
  60#include "dlm_internal.h"
  61#include <linux/dlm_device.h>
  62#include "memory.h"
  63#include "lowcomms.h"
  64#include "requestqueue.h"
  65#include "util.h"
  66#include "dir.h"
  67#include "member.h"
  68#include "lockspace.h"
  69#include "ast.h"
  70#include "lock.h"
  71#include "rcom.h"
  72#include "recover.h"
  73#include "lvb_table.h"
  74#include "user.h"
  75#include "config.h"
  76
  77static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
  78static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
  79static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  80static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
  81static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
  82static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
  83static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
  84static int send_remove(struct dlm_rsb *r);
  85static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  86static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  87static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
  88                                    struct dlm_message *ms);
  89static int receive_extralen(struct dlm_message *ms);
  90static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
  91static void del_timeout(struct dlm_lkb *lkb);
  92
  93/*
  94 * Lock compatibilty matrix - thanks Steve
  95 * UN = Unlocked state. Not really a state, used as a flag
  96 * PD = Padding. Used to make the matrix a nice power of two in size
  97 * Other states are the same as the VMS DLM.
  98 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
  99 */
 100
 101static const int __dlm_compat_matrix[8][8] = {
 102      /* UN NL CR CW PR PW EX PD */
 103        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
 104        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
 105        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
 106        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
 107        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
 108        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
 109        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
 110        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 111};
 112
 113/*
 114 * This defines the direction of transfer of LVB data.
 115 * Granted mode is the row; requested mode is the column.
 116 * Usage: matrix[grmode+1][rqmode+1]
 117 * 1 = LVB is returned to the caller
 118 * 0 = LVB is written to the resource
 119 * -1 = nothing happens to the LVB
 120 */
 121
 122const int dlm_lvb_operations[8][8] = {
 123        /* UN   NL  CR  CW  PR  PW  EX  PD*/
 124        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
 125        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
 126        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
 127        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
 128        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
 129        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
 130        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
 131        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
 132};
 133
 134#define modes_compat(gr, rq) \
 135        __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
 136
 137int dlm_modes_compat(int mode1, int mode2)
 138{
 139        return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
 140}
 141
 142/*
 143 * Compatibility matrix for conversions with QUECVT set.
 144 * Granted mode is the row; requested mode is the column.
 145 * Usage: matrix[grmode+1][rqmode+1]
 146 */
 147
 148static const int __quecvt_compat_matrix[8][8] = {
 149      /* UN NL CR CW PR PW EX PD */
 150        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
 151        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
 152        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
 153        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
 154        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
 155        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
 156        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
 157        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 158};
 159
 160void dlm_print_lkb(struct dlm_lkb *lkb)
 161{
 162        printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
 163               "     status %d rqmode %d grmode %d wait_type %d\n",
 164               lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
 165               lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
 166               lkb->lkb_grmode, lkb->lkb_wait_type);
 167}
 168
 169static void dlm_print_rsb(struct dlm_rsb *r)
 170{
 171        printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
 172               r->res_nodeid, r->res_flags, r->res_first_lkid,
 173               r->res_recover_locks_count, r->res_name);
 174}
 175
 176void dlm_dump_rsb(struct dlm_rsb *r)
 177{
 178        struct dlm_lkb *lkb;
 179
 180        dlm_print_rsb(r);
 181
 182        printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
 183               list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
 184        printk(KERN_ERR "rsb lookup list\n");
 185        list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
 186                dlm_print_lkb(lkb);
 187        printk(KERN_ERR "rsb grant queue:\n");
 188        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
 189                dlm_print_lkb(lkb);
 190        printk(KERN_ERR "rsb convert queue:\n");
 191        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
 192                dlm_print_lkb(lkb);
 193        printk(KERN_ERR "rsb wait queue:\n");
 194        list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
 195                dlm_print_lkb(lkb);
 196}
 197
 198/* Threads cannot use the lockspace while it's being recovered */
 199
 200static inline void dlm_lock_recovery(struct dlm_ls *ls)
 201{
 202        down_read(&ls->ls_in_recovery);
 203}
 204
 205void dlm_unlock_recovery(struct dlm_ls *ls)
 206{
 207        up_read(&ls->ls_in_recovery);
 208}
 209
 210int dlm_lock_recovery_try(struct dlm_ls *ls)
 211{
 212        return down_read_trylock(&ls->ls_in_recovery);
 213}
 214
 215static inline int can_be_queued(struct dlm_lkb *lkb)
 216{
 217        return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
 218}
 219
 220static inline int force_blocking_asts(struct dlm_lkb *lkb)
 221{
 222        return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
 223}
 224
 225static inline int is_demoted(struct dlm_lkb *lkb)
 226{
 227        return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
 228}
 229
 230static inline int is_altmode(struct dlm_lkb *lkb)
 231{
 232        return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
 233}
 234
 235static inline int is_granted(struct dlm_lkb *lkb)
 236{
 237        return (lkb->lkb_status == DLM_LKSTS_GRANTED);
 238}
 239
 240static inline int is_remote(struct dlm_rsb *r)
 241{
 242        DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
 243        return !!r->res_nodeid;
 244}
 245
 246static inline int is_process_copy(struct dlm_lkb *lkb)
 247{
 248        return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
 249}
 250
 251static inline int is_master_copy(struct dlm_lkb *lkb)
 252{
 253        if (lkb->lkb_flags & DLM_IFL_MSTCPY)
 254                DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
 255        return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 256}
 257
 258static inline int middle_conversion(struct dlm_lkb *lkb)
 259{
 260        if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
 261            (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
 262                return 1;
 263        return 0;
 264}
 265
 266static inline int down_conversion(struct dlm_lkb *lkb)
 267{
 268        return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
 269}
 270
 271static inline int is_overlap_unlock(struct dlm_lkb *lkb)
 272{
 273        return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
 274}
 275
 276static inline int is_overlap_cancel(struct dlm_lkb *lkb)
 277{
 278        return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
 279}
 280
 281static inline int is_overlap(struct dlm_lkb *lkb)
 282{
 283        return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
 284                                  DLM_IFL_OVERLAP_CANCEL));
 285}
 286
 287static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
 288{
 289        if (is_master_copy(lkb))
 290                return;
 291
 292        del_timeout(lkb);
 293
 294        DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
 295
 296        /* if the operation was a cancel, then return -DLM_ECANCEL, if a
 297           timeout caused the cancel then return -ETIMEDOUT */
 298        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
 299                lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
 300                rv = -ETIMEDOUT;
 301        }
 302
 303        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
 304                lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
 305                rv = -EDEADLK;
 306        }
 307
 308        dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
 309}
 310
 311static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
 312{
 313        queue_cast(r, lkb,
 314                   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
 315}
 316
 317static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 318{
 319        if (is_master_copy(lkb)) {
 320                send_bast(r, lkb, rqmode);
 321        } else {
 322                dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
 323        }
 324}
 325
 326/*
 327 * Basic operations on rsb's and lkb's
 328 */
 329
 330static int pre_rsb_struct(struct dlm_ls *ls)
 331{
 332        struct dlm_rsb *r1, *r2;
 333        int count = 0;
 334
 335        spin_lock(&ls->ls_new_rsb_spin);
 336        if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
 337                spin_unlock(&ls->ls_new_rsb_spin);
 338                return 0;
 339        }
 340        spin_unlock(&ls->ls_new_rsb_spin);
 341
 342        r1 = dlm_allocate_rsb(ls);
 343        r2 = dlm_allocate_rsb(ls);
 344
 345        spin_lock(&ls->ls_new_rsb_spin);
 346        if (r1) {
 347                list_add(&r1->res_hashchain, &ls->ls_new_rsb);
 348                ls->ls_new_rsb_count++;
 349        }
 350        if (r2) {
 351                list_add(&r2->res_hashchain, &ls->ls_new_rsb);
 352                ls->ls_new_rsb_count++;
 353        }
 354        count = ls->ls_new_rsb_count;
 355        spin_unlock(&ls->ls_new_rsb_spin);
 356
 357        if (!count)
 358                return -ENOMEM;
 359        return 0;
 360}
 361
 362/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
 363   unlock any spinlocks, go back and call pre_rsb_struct again.
 364   Otherwise, take an rsb off the list and return it. */
 365
 366static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
 367                          struct dlm_rsb **r_ret)
 368{
 369        struct dlm_rsb *r;
 370        int count;
 371
 372        spin_lock(&ls->ls_new_rsb_spin);
 373        if (list_empty(&ls->ls_new_rsb)) {
 374                count = ls->ls_new_rsb_count;
 375                spin_unlock(&ls->ls_new_rsb_spin);
 376                log_debug(ls, "find_rsb retry %d %d %s",
 377                          count, dlm_config.ci_new_rsb_count, name);
 378                return -EAGAIN;
 379        }
 380
 381        r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
 382        list_del(&r->res_hashchain);
 383        ls->ls_new_rsb_count--;
 384        spin_unlock(&ls->ls_new_rsb_spin);
 385
 386        r->res_ls = ls;
 387        r->res_length = len;
 388        memcpy(r->res_name, name, len);
 389        mutex_init(&r->res_mutex);
 390
 391        INIT_LIST_HEAD(&r->res_hashchain);
 392        INIT_LIST_HEAD(&r->res_lookup);
 393        INIT_LIST_HEAD(&r->res_grantqueue);
 394        INIT_LIST_HEAD(&r->res_convertqueue);
 395        INIT_LIST_HEAD(&r->res_waitqueue);
 396        INIT_LIST_HEAD(&r->res_root_list);
 397        INIT_LIST_HEAD(&r->res_recover_list);
 398
 399        *r_ret = r;
 400        return 0;
 401}
 402
 403static int search_rsb_list(struct list_head *head, char *name, int len,
 404                           unsigned int flags, struct dlm_rsb **r_ret)
 405{
 406        struct dlm_rsb *r;
 407        int error = 0;
 408
 409        list_for_each_entry(r, head, res_hashchain) {
 410                if (len == r->res_length && !memcmp(name, r->res_name, len))
 411                        goto found;
 412        }
 413        *r_ret = NULL;
 414        return -EBADR;
 415
 416 found:
 417        if (r->res_nodeid && (flags & R_MASTER))
 418                error = -ENOTBLK;
 419        *r_ret = r;
 420        return error;
 421}
 422
 423static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 424                       unsigned int flags, struct dlm_rsb **r_ret)
 425{
 426        struct dlm_rsb *r;
 427        int error;
 428
 429        error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
 430        if (!error) {
 431                kref_get(&r->res_ref);
 432                goto out;
 433        }
 434        error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 435        if (error)
 436                goto out;
 437
 438        list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
 439
 440        if (dlm_no_directory(ls))
 441                goto out;
 442
 443        if (r->res_nodeid == -1) {
 444                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 445                r->res_first_lkid = 0;
 446        } else if (r->res_nodeid > 0) {
 447                rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
 448                r->res_first_lkid = 0;
 449        } else {
 450                DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
 451                DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
 452        }
 453 out:
 454        *r_ret = r;
 455        return error;
 456}
 457
 458/*
 459 * Find rsb in rsbtbl and potentially create/add one
 460 *
 461 * Delaying the release of rsb's has a similar benefit to applications keeping
 462 * NL locks on an rsb, but without the guarantee that the cached master value
 463 * will still be valid when the rsb is reused.  Apps aren't always smart enough
 464 * to keep NL locks on an rsb that they may lock again shortly; this can lead
 465 * to excessive master lookups and removals if we don't delay the release.
 466 *
 467 * Searching for an rsb means looking through both the normal list and toss
 468 * list.  When found on the toss list the rsb is moved to the normal list with
 469 * ref count of 1; when found on normal list the ref count is incremented.
 470 */
 471
 472static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 473                    unsigned int flags, struct dlm_rsb **r_ret)
 474{
 475        struct dlm_rsb *r = NULL;
 476        uint32_t hash, bucket;
 477        int error;
 478
 479        if (namelen > DLM_RESNAME_MAXLEN) {
 480                error = -EINVAL;
 481                goto out;
 482        }
 483
 484        if (dlm_no_directory(ls))
 485                flags |= R_CREATE;
 486
 487        hash = jhash(name, namelen, 0);
 488        bucket = hash & (ls->ls_rsbtbl_size - 1);
 489
 490 retry:
 491        if (flags & R_CREATE) {
 492                error = pre_rsb_struct(ls);
 493                if (error < 0)
 494                        goto out;
 495        }
 496
 497        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 498
 499        error = _search_rsb(ls, name, namelen, bucket, flags, &r);
 500        if (!error)
 501                goto out_unlock;
 502
 503        if (error == -EBADR && !(flags & R_CREATE))
 504                goto out_unlock;
 505
 506        /* the rsb was found but wasn't a master copy */
 507        if (error == -ENOTBLK)
 508                goto out_unlock;
 509
 510        error = get_rsb_struct(ls, name, namelen, &r);
 511        if (error == -EAGAIN) {
 512                spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 513                goto retry;
 514        }
 515        if (error)
 516                goto out_unlock;
 517
 518        r->res_hash = hash;
 519        r->res_bucket = bucket;
 520        r->res_nodeid = -1;
 521        kref_init(&r->res_ref);
 522
 523        /* With no directory, the master can be set immediately */
 524        if (dlm_no_directory(ls)) {
 525                int nodeid = dlm_dir_nodeid(r);
 526                if (nodeid == dlm_our_nodeid())
 527                        nodeid = 0;
 528                r->res_nodeid = nodeid;
 529        }
 530        list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
 531        error = 0;
 532 out_unlock:
 533        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 534 out:
 535        *r_ret = r;
 536        return error;
 537}
 538
 539/* This is only called to add a reference when the code already holds
 540   a valid reference to the rsb, so there's no need for locking. */
 541
 542static inline void hold_rsb(struct dlm_rsb *r)
 543{
 544        kref_get(&r->res_ref);
 545}
 546
 547void dlm_hold_rsb(struct dlm_rsb *r)
 548{
 549        hold_rsb(r);
 550}
 551
 552static void toss_rsb(struct kref *kref)
 553{
 554        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 555        struct dlm_ls *ls = r->res_ls;
 556
 557        DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 558        kref_init(&r->res_ref);
 559        list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
 560        r->res_toss_time = jiffies;
 561        if (r->res_lvbptr) {
 562                dlm_free_lvb(r->res_lvbptr);
 563                r->res_lvbptr = NULL;
 564        }
 565}
 566
 567/* When all references to the rsb are gone it's transferred to
 568   the tossed list for later disposal. */
 569
 570static void put_rsb(struct dlm_rsb *r)
 571{
 572        struct dlm_ls *ls = r->res_ls;
 573        uint32_t bucket = r->res_bucket;
 574
 575        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 576        kref_put(&r->res_ref, toss_rsb);
 577        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 578}
 579
 580void dlm_put_rsb(struct dlm_rsb *r)
 581{
 582        put_rsb(r);
 583}
 584
 585/* See comment for unhold_lkb */
 586
 587static void unhold_rsb(struct dlm_rsb *r)
 588{
 589        int rv;
 590        rv = kref_put(&r->res_ref, toss_rsb);
 591        DLM_ASSERT(!rv, dlm_dump_rsb(r););
 592}
 593
 594static void kill_rsb(struct kref *kref)
 595{
 596        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 597
 598        /* All work is done after the return from kref_put() so we
 599           can release the write_lock before the remove and free. */
 600
 601        DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
 602        DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
 603        DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
 604        DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
 605        DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
 606        DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
 607}
 608
 609/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
 610   The rsb must exist as long as any lkb's for it do. */
 611
 612static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 613{
 614        hold_rsb(r);
 615        lkb->lkb_resource = r;
 616}
 617
 618static void detach_lkb(struct dlm_lkb *lkb)
 619{
 620        if (lkb->lkb_resource) {
 621                put_rsb(lkb->lkb_resource);
 622                lkb->lkb_resource = NULL;
 623        }
 624}
 625
 626static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
 627{
 628        struct dlm_lkb *lkb;
 629        int rv, id;
 630
 631        lkb = dlm_allocate_lkb(ls);
 632        if (!lkb)
 633                return -ENOMEM;
 634
 635        lkb->lkb_nodeid = -1;
 636        lkb->lkb_grmode = DLM_LOCK_IV;
 637        kref_init(&lkb->lkb_ref);
 638        INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 639        INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 640        INIT_LIST_HEAD(&lkb->lkb_time_list);
 641        INIT_LIST_HEAD(&lkb->lkb_cb_list);
 642        mutex_init(&lkb->lkb_cb_mutex);
 643        INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
 644
 645 retry:
 646        rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
 647        if (!rv)
 648                return -ENOMEM;
 649
 650        spin_lock(&ls->ls_lkbidr_spin);
 651        rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
 652        if (!rv)
 653                lkb->lkb_id = id;
 654        spin_unlock(&ls->ls_lkbidr_spin);
 655
 656        if (rv == -EAGAIN)
 657                goto retry;
 658
 659        if (rv < 0) {
 660                log_error(ls, "create_lkb idr error %d", rv);
 661                return rv;
 662        }
 663
 664        *lkb_ret = lkb;
 665        return 0;
 666}
 667
 668static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 669{
 670        struct dlm_lkb *lkb;
 671
 672        spin_lock(&ls->ls_lkbidr_spin);
 673        lkb = idr_find(&ls->ls_lkbidr, lkid);
 674        if (lkb)
 675                kref_get(&lkb->lkb_ref);
 676        spin_unlock(&ls->ls_lkbidr_spin);
 677
 678        *lkb_ret = lkb;
 679        return lkb ? 0 : -ENOENT;
 680}
 681
 682static void kill_lkb(struct kref *kref)
 683{
 684        struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
 685
 686        /* All work is done after the return from kref_put() so we
 687           can release the write_lock before the detach_lkb */
 688
 689        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 690}
 691
 692/* __put_lkb() is used when an lkb may not have an rsb attached to
 693   it so we need to provide the lockspace explicitly */
 694
 695static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 696{
 697        uint32_t lkid = lkb->lkb_id;
 698
 699        spin_lock(&ls->ls_lkbidr_spin);
 700        if (kref_put(&lkb->lkb_ref, kill_lkb)) {
 701                idr_remove(&ls->ls_lkbidr, lkid);
 702                spin_unlock(&ls->ls_lkbidr_spin);
 703
 704                detach_lkb(lkb);
 705
 706                /* for local/process lkbs, lvbptr points to caller's lksb */
 707                if (lkb->lkb_lvbptr && is_master_copy(lkb))
 708                        dlm_free_lvb(lkb->lkb_lvbptr);
 709                dlm_free_lkb(lkb);
 710                return 1;
 711        } else {
 712                spin_unlock(&ls->ls_lkbidr_spin);
 713                return 0;
 714        }
 715}
 716
 717int dlm_put_lkb(struct dlm_lkb *lkb)
 718{
 719        struct dlm_ls *ls;
 720
 721        DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
 722        DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
 723
 724        ls = lkb->lkb_resource->res_ls;
 725        return __put_lkb(ls, lkb);
 726}
 727
 728/* This is only called to add a reference when the code already holds
 729   a valid reference to the lkb, so there's no need for locking. */
 730
 731static inline void hold_lkb(struct dlm_lkb *lkb)
 732{
 733        kref_get(&lkb->lkb_ref);
 734}
 735
 736/* This is called when we need to remove a reference and are certain
 737   it's not the last ref.  e.g. del_lkb is always called between a
 738   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
 739   put_lkb would work fine, but would involve unnecessary locking */
 740
 741static inline void unhold_lkb(struct dlm_lkb *lkb)
 742{
 743        int rv;
 744        rv = kref_put(&lkb->lkb_ref, kill_lkb);
 745        DLM_ASSERT(!rv, dlm_print_lkb(lkb););
 746}
 747
 748static void lkb_add_ordered(struct list_head *new, struct list_head *head,
 749                            int mode)
 750{
 751        struct dlm_lkb *lkb = NULL;
 752
 753        list_for_each_entry(lkb, head, lkb_statequeue)
 754                if (lkb->lkb_rqmode < mode)
 755                        break;
 756
 757        __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
 758}
 759
 760/* add/remove lkb to rsb's grant/convert/wait queue */
 761
 762static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
 763{
 764        kref_get(&lkb->lkb_ref);
 765
 766        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 767
 768        lkb->lkb_timestamp = ktime_get();
 769
 770        lkb->lkb_status = status;
 771
 772        switch (status) {
 773        case DLM_LKSTS_WAITING:
 774                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 775                        list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
 776                else
 777                        list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
 778                break;
 779        case DLM_LKSTS_GRANTED:
 780                /* convention says granted locks kept in order of grmode */
 781                lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
 782                                lkb->lkb_grmode);
 783                break;
 784        case DLM_LKSTS_CONVERT:
 785                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 786                        list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
 787                else
 788                        list_add_tail(&lkb->lkb_statequeue,
 789                                      &r->res_convertqueue);
 790                break;
 791        default:
 792                DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
 793        }
 794}
 795
 796static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 797{
 798        lkb->lkb_status = 0;
 799        list_del(&lkb->lkb_statequeue);
 800        unhold_lkb(lkb);
 801}
 802
 803static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
 804{
 805        hold_lkb(lkb);
 806        del_lkb(r, lkb);
 807        add_lkb(r, lkb, sts);
 808        unhold_lkb(lkb);
 809}
 810
 811static int msg_reply_type(int mstype)
 812{
 813        switch (mstype) {
 814        case DLM_MSG_REQUEST:
 815                return DLM_MSG_REQUEST_REPLY;
 816        case DLM_MSG_CONVERT:
 817                return DLM_MSG_CONVERT_REPLY;
 818        case DLM_MSG_UNLOCK:
 819                return DLM_MSG_UNLOCK_REPLY;
 820        case DLM_MSG_CANCEL:
 821                return DLM_MSG_CANCEL_REPLY;
 822        case DLM_MSG_LOOKUP:
 823                return DLM_MSG_LOOKUP_REPLY;
 824        }
 825        return -1;
 826}
 827
 828static int nodeid_warned(int nodeid, int num_nodes, int *warned)
 829{
 830        int i;
 831
 832        for (i = 0; i < num_nodes; i++) {
 833                if (!warned[i]) {
 834                        warned[i] = nodeid;
 835                        return 0;
 836                }
 837                if (warned[i] == nodeid)
 838                        return 1;
 839        }
 840        return 0;
 841}
 842
 843void dlm_scan_waiters(struct dlm_ls *ls)
 844{
 845        struct dlm_lkb *lkb;
 846        ktime_t zero = ktime_set(0, 0);
 847        s64 us;
 848        s64 debug_maxus = 0;
 849        u32 debug_scanned = 0;
 850        u32 debug_expired = 0;
 851        int num_nodes = 0;
 852        int *warned = NULL;
 853
 854        if (!dlm_config.ci_waitwarn_us)
 855                return;
 856
 857        mutex_lock(&ls->ls_waiters_mutex);
 858
 859        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
 860                if (ktime_equal(lkb->lkb_wait_time, zero))
 861                        continue;
 862
 863                debug_scanned++;
 864
 865                us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
 866
 867                if (us < dlm_config.ci_waitwarn_us)
 868                        continue;
 869
 870                lkb->lkb_wait_time = zero;
 871
 872                debug_expired++;
 873                if (us > debug_maxus)
 874                        debug_maxus = us;
 875
 876                if (!num_nodes) {
 877                        num_nodes = ls->ls_num_nodes;
 878                        warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
 879                }
 880                if (!warned)
 881                        continue;
 882                if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
 883                        continue;
 884
 885                log_error(ls, "waitwarn %x %lld %d us check connection to "
 886                          "node %d", lkb->lkb_id, (long long)us,
 887                          dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
 888        }
 889        mutex_unlock(&ls->ls_waiters_mutex);
 890        kfree(warned);
 891
 892        if (debug_expired)
 893                log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
 894                          debug_scanned, debug_expired,
 895                          dlm_config.ci_waitwarn_us, (long long)debug_maxus);
 896}
 897
 898/* add/remove lkb from global waiters list of lkb's waiting for
 899   a reply from a remote node */
 900
 901static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 902{
 903        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 904        int error = 0;
 905
 906        mutex_lock(&ls->ls_waiters_mutex);
 907
 908        if (is_overlap_unlock(lkb) ||
 909            (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
 910                error = -EINVAL;
 911                goto out;
 912        }
 913
 914        if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
 915                switch (mstype) {
 916                case DLM_MSG_UNLOCK:
 917                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
 918                        break;
 919                case DLM_MSG_CANCEL:
 920                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
 921                        break;
 922                default:
 923                        error = -EBUSY;
 924                        goto out;
 925                }
 926                lkb->lkb_wait_count++;
 927                hold_lkb(lkb);
 928
 929                log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
 930                          lkb->lkb_id, lkb->lkb_wait_type, mstype,
 931                          lkb->lkb_wait_count, lkb->lkb_flags);
 932                goto out;
 933        }
 934
 935        DLM_ASSERT(!lkb->lkb_wait_count,
 936                   dlm_print_lkb(lkb);
 937                   printk("wait_count %d\n", lkb->lkb_wait_count););
 938
 939        lkb->lkb_wait_count++;
 940        lkb->lkb_wait_type = mstype;
 941        lkb->lkb_wait_time = ktime_get();
 942        lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
 943        hold_lkb(lkb);
 944        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
 945 out:
 946        if (error)
 947                log_error(ls, "addwait error %x %d flags %x %d %d %s",
 948                          lkb->lkb_id, error, lkb->lkb_flags, mstype,
 949                          lkb->lkb_wait_type, lkb->lkb_resource->res_name);
 950        mutex_unlock(&ls->ls_waiters_mutex);
 951        return error;
 952}
 953
 954/* We clear the RESEND flag because we might be taking an lkb off the waiters
 955   list as part of process_requestqueue (e.g. a lookup that has an optimized
 956   request reply on the requestqueue) between dlm_recover_waiters_pre() which
 957   set RESEND and dlm_recover_waiters_post() */
 958
 959static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
 960                                struct dlm_message *ms)
 961{
 962        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 963        int overlap_done = 0;
 964
 965        if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
 966                log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
 967                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
 968                overlap_done = 1;
 969                goto out_del;
 970        }
 971
 972        if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
 973                log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
 974                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 975                overlap_done = 1;
 976                goto out_del;
 977        }
 978
 979        /* Cancel state was preemptively cleared by a successful convert,
 980           see next comment, nothing to do. */
 981
 982        if ((mstype == DLM_MSG_CANCEL_REPLY) &&
 983            (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
 984                log_debug(ls, "remwait %x cancel_reply wait_type %d",
 985                          lkb->lkb_id, lkb->lkb_wait_type);
 986                return -1;
 987        }
 988
 989        /* Remove for the convert reply, and premptively remove for the
 990           cancel reply.  A convert has been granted while there's still
 991           an outstanding cancel on it (the cancel is moot and the result
 992           in the cancel reply should be 0).  We preempt the cancel reply
 993           because the app gets the convert result and then can follow up
 994           with another op, like convert.  This subsequent op would see the
 995           lingering state of the cancel and fail with -EBUSY. */
 996
 997        if ((mstype == DLM_MSG_CONVERT_REPLY) &&
 998            (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
 999            is_overlap_cancel(lkb) && ms && !ms->m_result) {
1000                log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1001                          lkb->lkb_id);
1002                lkb->lkb_wait_type = 0;
1003                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1004                lkb->lkb_wait_count--;
1005                goto out_del;
1006        }
1007
1008        /* N.B. type of reply may not always correspond to type of original
1009           msg due to lookup->request optimization, verify others? */
1010
1011        if (lkb->lkb_wait_type) {
1012                lkb->lkb_wait_type = 0;
1013                goto out_del;
1014        }
1015
1016        log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1017                  lkb->lkb_id, mstype, lkb->lkb_flags);
1018        return -1;
1019
1020 out_del:
1021        /* the force-unlock/cancel has completed and we haven't recvd a reply
1022           to the op that was in progress prior to the unlock/cancel; we
1023           give up on any reply to the earlier op.  FIXME: not sure when/how
1024           this would happen */
1025
1026        if (overlap_done && lkb->lkb_wait_type) {
1027                log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1028                          lkb->lkb_id, mstype, lkb->lkb_wait_type);
1029                lkb->lkb_wait_count--;
1030                lkb->lkb_wait_type = 0;
1031        }
1032
1033        DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1034
1035        lkb->lkb_flags &= ~DLM_IFL_RESEND;
1036        lkb->lkb_wait_count--;
1037        if (!lkb->lkb_wait_count)
1038                list_del_init(&lkb->lkb_wait_reply);
1039        unhold_lkb(lkb);
1040        return 0;
1041}
1042
1043static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1044{
1045        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1046        int error;
1047
1048        mutex_lock(&ls->ls_waiters_mutex);
1049        error = _remove_from_waiters(lkb, mstype, NULL);
1050        mutex_unlock(&ls->ls_waiters_mutex);
1051        return error;
1052}
1053
1054/* Handles situations where we might be processing a "fake" or "stub" reply in
1055   which we can't try to take waiters_mutex again. */
1056
1057static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1058{
1059        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1060        int error;
1061
1062        if (ms->m_flags != DLM_IFL_STUB_MS)
1063                mutex_lock(&ls->ls_waiters_mutex);
1064        error = _remove_from_waiters(lkb, ms->m_type, ms);
1065        if (ms->m_flags != DLM_IFL_STUB_MS)
1066                mutex_unlock(&ls->ls_waiters_mutex);
1067        return error;
1068}
1069
1070static void dir_remove(struct dlm_rsb *r)
1071{
1072        int to_nodeid;
1073
1074        if (dlm_no_directory(r->res_ls))
1075                return;
1076
1077        to_nodeid = dlm_dir_nodeid(r);
1078        if (to_nodeid != dlm_our_nodeid())
1079                send_remove(r);
1080        else
1081                dlm_dir_remove_entry(r->res_ls, to_nodeid,
1082                                     r->res_name, r->res_length);
1083}
1084
1085/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1086   found since they are in order of newest to oldest? */
1087
1088static int shrink_bucket(struct dlm_ls *ls, int b)
1089{
1090        struct dlm_rsb *r;
1091        int count = 0, found;
1092
1093        for (;;) {
1094                found = 0;
1095                spin_lock(&ls->ls_rsbtbl[b].lock);
1096                list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1097                                            res_hashchain) {
1098                        if (!time_after_eq(jiffies, r->res_toss_time +
1099                                           dlm_config.ci_toss_secs * HZ))
1100                                continue;
1101                        found = 1;
1102                        break;
1103                }
1104
1105                if (!found) {
1106                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1107                        break;
1108                }
1109
1110                if (kref_put(&r->res_ref, kill_rsb)) {
1111                        list_del(&r->res_hashchain);
1112                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1113
1114                        if (is_master(r))
1115                                dir_remove(r);
1116                        dlm_free_rsb(r);
1117                        count++;
1118                } else {
1119                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1120                        log_error(ls, "tossed rsb in use %s", r->res_name);
1121                }
1122        }
1123
1124        return count;
1125}
1126
1127void dlm_scan_rsbs(struct dlm_ls *ls)
1128{
1129        int i;
1130
1131        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1132                shrink_bucket(ls, i);
1133                if (dlm_locking_stopped(ls))
1134                        break;
1135                cond_resched();
1136        }
1137}
1138
1139static void add_timeout(struct dlm_lkb *lkb)
1140{
1141        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1142
1143        if (is_master_copy(lkb))
1144                return;
1145
1146        if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1147            !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1148                lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1149                goto add_it;
1150        }
1151        if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1152                goto add_it;
1153        return;
1154
1155 add_it:
1156        DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1157        mutex_lock(&ls->ls_timeout_mutex);
1158        hold_lkb(lkb);
1159        list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1160        mutex_unlock(&ls->ls_timeout_mutex);
1161}
1162
1163static void del_timeout(struct dlm_lkb *lkb)
1164{
1165        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1166
1167        mutex_lock(&ls->ls_timeout_mutex);
1168        if (!list_empty(&lkb->lkb_time_list)) {
1169                list_del_init(&lkb->lkb_time_list);
1170                unhold_lkb(lkb);
1171        }
1172        mutex_unlock(&ls->ls_timeout_mutex);
1173}
1174
1175/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1176   lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1177   and then lock rsb because of lock ordering in add_timeout.  We may need
1178   to specify some special timeout-related bits in the lkb that are just to
1179   be accessed under the timeout_mutex. */
1180
1181void dlm_scan_timeout(struct dlm_ls *ls)
1182{
1183        struct dlm_rsb *r;
1184        struct dlm_lkb *lkb;
1185        int do_cancel, do_warn;
1186        s64 wait_us;
1187
1188        for (;;) {
1189                if (dlm_locking_stopped(ls))
1190                        break;
1191
1192                do_cancel = 0;
1193                do_warn = 0;
1194                mutex_lock(&ls->ls_timeout_mutex);
1195                list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1196
1197                        wait_us = ktime_to_us(ktime_sub(ktime_get(),
1198                                                        lkb->lkb_timestamp));
1199
1200                        if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1201                            wait_us >= (lkb->lkb_timeout_cs * 10000))
1202                                do_cancel = 1;
1203
1204                        if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1205                            wait_us >= dlm_config.ci_timewarn_cs * 10000)
1206                                do_warn = 1;
1207
1208                        if (!do_cancel && !do_warn)
1209                                continue;
1210                        hold_lkb(lkb);
1211                        break;
1212                }
1213                mutex_unlock(&ls->ls_timeout_mutex);
1214
1215                if (!do_cancel && !do_warn)
1216                        break;
1217
1218                r = lkb->lkb_resource;
1219                hold_rsb(r);
1220                lock_rsb(r);
1221
1222                if (do_warn) {
1223                        /* clear flag so we only warn once */
1224                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1225                        if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1226                                del_timeout(lkb);
1227                        dlm_timeout_warn(lkb);
1228                }
1229
1230                if (do_cancel) {
1231                        log_debug(ls, "timeout cancel %x node %d %s",
1232                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1233                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1234                        lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1235                        del_timeout(lkb);
1236                        _cancel_lock(r, lkb);
1237                }
1238
1239                unlock_rsb(r);
1240                unhold_rsb(r);
1241                dlm_put_lkb(lkb);
1242        }
1243}
1244
1245/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1246   dlm_recoverd before checking/setting ls_recover_begin. */
1247
1248void dlm_adjust_timeouts(struct dlm_ls *ls)
1249{
1250        struct dlm_lkb *lkb;
1251        u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1252
1253        ls->ls_recover_begin = 0;
1254        mutex_lock(&ls->ls_timeout_mutex);
1255        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1256                lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1257        mutex_unlock(&ls->ls_timeout_mutex);
1258
1259        if (!dlm_config.ci_waitwarn_us)
1260                return;
1261
1262        mutex_lock(&ls->ls_waiters_mutex);
1263        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1264                if (ktime_to_us(lkb->lkb_wait_time))
1265                        lkb->lkb_wait_time = ktime_get();
1266        }
1267        mutex_unlock(&ls->ls_waiters_mutex);
1268}
1269
1270/* lkb is master or local copy */
1271
1272static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1273{
1274        int b, len = r->res_ls->ls_lvblen;
1275
1276        /* b=1 lvb returned to caller
1277           b=0 lvb written to rsb or invalidated
1278           b=-1 do nothing */
1279
1280        b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1281
1282        if (b == 1) {
1283                if (!lkb->lkb_lvbptr)
1284                        return;
1285
1286                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1287                        return;
1288
1289                if (!r->res_lvbptr)
1290                        return;
1291
1292                memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1293                lkb->lkb_lvbseq = r->res_lvbseq;
1294
1295        } else if (b == 0) {
1296                if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1297                        rsb_set_flag(r, RSB_VALNOTVALID);
1298                        return;
1299                }
1300
1301                if (!lkb->lkb_lvbptr)
1302                        return;
1303
1304                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1305                        return;
1306
1307                if (!r->res_lvbptr)
1308                        r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1309
1310                if (!r->res_lvbptr)
1311                        return;
1312
1313                memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1314                r->res_lvbseq++;
1315                lkb->lkb_lvbseq = r->res_lvbseq;
1316                rsb_clear_flag(r, RSB_VALNOTVALID);
1317        }
1318
1319        if (rsb_flag(r, RSB_VALNOTVALID))
1320                lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1321}
1322
1323static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1324{
1325        if (lkb->lkb_grmode < DLM_LOCK_PW)
1326                return;
1327
1328        if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1329                rsb_set_flag(r, RSB_VALNOTVALID);
1330                return;
1331        }
1332
1333        if (!lkb->lkb_lvbptr)
1334                return;
1335
1336        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1337                return;
1338
1339        if (!r->res_lvbptr)
1340                r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1341
1342        if (!r->res_lvbptr)
1343                return;
1344
1345        memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1346        r->res_lvbseq++;
1347        rsb_clear_flag(r, RSB_VALNOTVALID);
1348}
1349
1350/* lkb is process copy (pc) */
1351
1352static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1353                            struct dlm_message *ms)
1354{
1355        int b;
1356
1357        if (!lkb->lkb_lvbptr)
1358                return;
1359
1360        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1361                return;
1362
1363        b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1364        if (b == 1) {
1365                int len = receive_extralen(ms);
1366                if (len > DLM_RESNAME_MAXLEN)
1367                        len = DLM_RESNAME_MAXLEN;
1368                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1369                lkb->lkb_lvbseq = ms->m_lvbseq;
1370        }
1371}
1372
1373/* Manipulate lkb's on rsb's convert/granted/waiting queues
1374   remove_lock -- used for unlock, removes lkb from granted
1375   revert_lock -- used for cancel, moves lkb from convert to granted
1376   grant_lock  -- used for request and convert, adds lkb to granted or
1377                  moves lkb from convert or waiting to granted
1378
1379   Each of these is used for master or local copy lkb's.  There is
1380   also a _pc() variation used to make the corresponding change on
1381   a process copy (pc) lkb. */
1382
1383static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1384{
1385        del_lkb(r, lkb);
1386        lkb->lkb_grmode = DLM_LOCK_IV;
1387        /* this unhold undoes the original ref from create_lkb()
1388           so this leads to the lkb being freed */
1389        unhold_lkb(lkb);
1390}
1391
1392static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1393{
1394        set_lvb_unlock(r, lkb);
1395        _remove_lock(r, lkb);
1396}
1397
1398static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1399{
1400        _remove_lock(r, lkb);
1401}
1402
1403/* returns: 0 did nothing
1404            1 moved lock to granted
1405           -1 removed lock */
1406
1407static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1408{
1409        int rv = 0;
1410
1411        lkb->lkb_rqmode = DLM_LOCK_IV;
1412
1413        switch (lkb->lkb_status) {
1414        case DLM_LKSTS_GRANTED:
1415                break;
1416        case DLM_LKSTS_CONVERT:
1417                move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1418                rv = 1;
1419                break;
1420        case DLM_LKSTS_WAITING:
1421                del_lkb(r, lkb);
1422                lkb->lkb_grmode = DLM_LOCK_IV;
1423                /* this unhold undoes the original ref from create_lkb()
1424                   so this leads to the lkb being freed */
1425                unhold_lkb(lkb);
1426                rv = -1;
1427                break;
1428        default:
1429                log_print("invalid status for revert %d", lkb->lkb_status);
1430        }
1431        return rv;
1432}
1433
1434static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435{
1436        return revert_lock(r, lkb);
1437}
1438
1439static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1440{
1441        if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1442                lkb->lkb_grmode = lkb->lkb_rqmode;
1443                if (lkb->lkb_status)
1444                        move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1445                else
1446                        add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1447        }
1448
1449        lkb->lkb_rqmode = DLM_LOCK_IV;
1450}
1451
1452static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1453{
1454        set_lvb_lock(r, lkb);
1455        _grant_lock(r, lkb);
1456        lkb->lkb_highbast = 0;
1457}
1458
1459static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1460                          struct dlm_message *ms)
1461{
1462        set_lvb_lock_pc(r, lkb, ms);
1463        _grant_lock(r, lkb);
1464}
1465
1466/* called by grant_pending_locks() which means an async grant message must
1467   be sent to the requesting node in addition to granting the lock if the
1468   lkb belongs to a remote node. */
1469
1470static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471{
1472        grant_lock(r, lkb);
1473        if (is_master_copy(lkb))
1474                send_grant(r, lkb);
1475        else
1476                queue_cast(r, lkb, 0);
1477}
1478
1479/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1480   change the granted/requested modes.  We're munging things accordingly in
1481   the process copy.
1482   CONVDEADLK: our grmode may have been forced down to NL to resolve a
1483   conversion deadlock
1484   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1485   compatible with other granted locks */
1486
1487static void munge_demoted(struct dlm_lkb *lkb)
1488{
1489        if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1490                log_print("munge_demoted %x invalid modes gr %d rq %d",
1491                          lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1492                return;
1493        }
1494
1495        lkb->lkb_grmode = DLM_LOCK_NL;
1496}
1497
1498static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1499{
1500        if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1501            ms->m_type != DLM_MSG_GRANT) {
1502                log_print("munge_altmode %x invalid reply type %d",
1503                          lkb->lkb_id, ms->m_type);
1504                return;
1505        }
1506
1507        if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1508                lkb->lkb_rqmode = DLM_LOCK_PR;
1509        else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1510                lkb->lkb_rqmode = DLM_LOCK_CW;
1511        else {
1512                log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1513                dlm_print_lkb(lkb);
1514        }
1515}
1516
1517static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1518{
1519        struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1520                                           lkb_statequeue);
1521        if (lkb->lkb_id == first->lkb_id)
1522                return 1;
1523
1524        return 0;
1525}
1526
1527/* Check if the given lkb conflicts with another lkb on the queue. */
1528
1529static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1530{
1531        struct dlm_lkb *this;
1532
1533        list_for_each_entry(this, head, lkb_statequeue) {
1534                if (this == lkb)
1535                        continue;
1536                if (!modes_compat(this, lkb))
1537                        return 1;
1538        }
1539        return 0;
1540}
1541
1542/*
1543 * "A conversion deadlock arises with a pair of lock requests in the converting
1544 * queue for one resource.  The granted mode of each lock blocks the requested
1545 * mode of the other lock."
1546 *
1547 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1548 * convert queue from being granted, then deadlk/demote lkb.
1549 *
1550 * Example:
1551 * Granted Queue: empty
1552 * Convert Queue: NL->EX (first lock)
1553 *                PR->EX (second lock)
1554 *
1555 * The first lock can't be granted because of the granted mode of the second
1556 * lock and the second lock can't be granted because it's not first in the
1557 * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1558 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1559 * flag set and return DEMOTED in the lksb flags.
1560 *
1561 * Originally, this function detected conv-deadlk in a more limited scope:
1562 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1563 * - if lkb1 was the first entry in the queue (not just earlier), and was
1564 *   blocked by the granted mode of lkb2, and there was nothing on the
1565 *   granted queue preventing lkb1 from being granted immediately, i.e.
1566 *   lkb2 was the only thing preventing lkb1 from being granted.
1567 *
1568 * That second condition meant we'd only say there was conv-deadlk if
1569 * resolving it (by demotion) would lead to the first lock on the convert
1570 * queue being granted right away.  It allowed conversion deadlocks to exist
1571 * between locks on the convert queue while they couldn't be granted anyway.
1572 *
1573 * Now, we detect and take action on conversion deadlocks immediately when
1574 * they're created, even if they may not be immediately consequential.  If
1575 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1576 * mode that would prevent lkb1's conversion from being granted, we do a
1577 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1578 * I think this means that the lkb_is_ahead condition below should always
1579 * be zero, i.e. there will never be conv-deadlk between two locks that are
1580 * both already on the convert queue.
1581 */
1582
1583static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1584{
1585        struct dlm_lkb *lkb1;
1586        int lkb_is_ahead = 0;
1587
1588        list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1589                if (lkb1 == lkb2) {
1590                        lkb_is_ahead = 1;
1591                        continue;
1592                }
1593
1594                if (!lkb_is_ahead) {
1595                        if (!modes_compat(lkb2, lkb1))
1596                                return 1;
1597                } else {
1598                        if (!modes_compat(lkb2, lkb1) &&
1599                            !modes_compat(lkb1, lkb2))
1600                                return 1;
1601                }
1602        }
1603        return 0;
1604}
1605
1606/*
1607 * Return 1 if the lock can be granted, 0 otherwise.
1608 * Also detect and resolve conversion deadlocks.
1609 *
1610 * lkb is the lock to be granted
1611 *
1612 * now is 1 if the function is being called in the context of the
1613 * immediate request, it is 0 if called later, after the lock has been
1614 * queued.
1615 *
1616 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1617 */
1618
1619static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1620{
1621        int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1622
1623        /*
1624         * 6-10: Version 5.4 introduced an option to address the phenomenon of
1625         * a new request for a NL mode lock being blocked.
1626         *
1627         * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1628         * request, then it would be granted.  In essence, the use of this flag
1629         * tells the Lock Manager to expedite theis request by not considering
1630         * what may be in the CONVERTING or WAITING queues...  As of this
1631         * writing, the EXPEDITE flag can be used only with new requests for NL
1632         * mode locks.  This flag is not valid for conversion requests.
1633         *
1634         * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1635         * conversion or used with a non-NL requested mode.  We also know an
1636         * EXPEDITE request is always granted immediately, so now must always
1637         * be 1.  The full condition to grant an expedite request: (now &&
1638         * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1639         * therefore be shortened to just checking the flag.
1640         */
1641
1642        if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1643                return 1;
1644
1645        /*
1646         * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1647         * added to the remaining conditions.
1648         */
1649
1650        if (queue_conflict(&r->res_grantqueue, lkb))
1651                goto out;
1652
1653        /*
1654         * 6-3: By default, a conversion request is immediately granted if the
1655         * requested mode is compatible with the modes of all other granted
1656         * locks
1657         */
1658
1659        if (queue_conflict(&r->res_convertqueue, lkb))
1660                goto out;
1661
1662        /*
1663         * 6-5: But the default algorithm for deciding whether to grant or
1664         * queue conversion requests does not by itself guarantee that such
1665         * requests are serviced on a "first come first serve" basis.  This, in
1666         * turn, can lead to a phenomenon known as "indefinate postponement".
1667         *
1668         * 6-7: This issue is dealt with by using the optional QUECVT flag with
1669         * the system service employed to request a lock conversion.  This flag
1670         * forces certain conversion requests to be queued, even if they are
1671         * compatible with the granted modes of other locks on the same
1672         * resource.  Thus, the use of this flag results in conversion requests
1673         * being ordered on a "first come first servce" basis.
1674         *
1675         * DCT: This condition is all about new conversions being able to occur
1676         * "in place" while the lock remains on the granted queue (assuming
1677         * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1678         * doesn't _have_ to go onto the convert queue where it's processed in
1679         * order.  The "now" variable is necessary to distinguish converts
1680         * being received and processed for the first time now, because once a
1681         * convert is moved to the conversion queue the condition below applies
1682         * requiring fifo granting.
1683         */
1684
1685        if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1686                return 1;
1687
1688        /*
1689         * The NOORDER flag is set to avoid the standard vms rules on grant
1690         * order.
1691         */
1692
1693        if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1694                return 1;
1695
1696        /*
1697         * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1698         * granted until all other conversion requests ahead of it are granted
1699         * and/or canceled.
1700         */
1701
1702        if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1703                return 1;
1704
1705        /*
1706         * 6-4: By default, a new request is immediately granted only if all
1707         * three of the following conditions are satisfied when the request is
1708         * issued:
1709         * - The queue of ungranted conversion requests for the resource is
1710         *   empty.
1711         * - The queue of ungranted new requests for the resource is empty.
1712         * - The mode of the new request is compatible with the most
1713         *   restrictive mode of all granted locks on the resource.
1714         */
1715
1716        if (now && !conv && list_empty(&r->res_convertqueue) &&
1717            list_empty(&r->res_waitqueue))
1718                return 1;
1719
1720        /*
1721         * 6-4: Once a lock request is in the queue of ungranted new requests,
1722         * it cannot be granted until the queue of ungranted conversion
1723         * requests is empty, all ungranted new requests ahead of it are
1724         * granted and/or canceled, and it is compatible with the granted mode
1725         * of the most restrictive lock granted on the resource.
1726         */
1727
1728        if (!now && !conv && list_empty(&r->res_convertqueue) &&
1729            first_in_list(lkb, &r->res_waitqueue))
1730                return 1;
1731 out:
1732        return 0;
1733}
1734
1735static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1736                          int *err)
1737{
1738        int rv;
1739        int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1740        int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1741
1742        if (err)
1743                *err = 0;
1744
1745        rv = _can_be_granted(r, lkb, now);
1746        if (rv)
1747                goto out;
1748
1749        /*
1750         * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1751         * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1752         * cancels one of the locks.
1753         */
1754
1755        if (is_convert && can_be_queued(lkb) &&
1756            conversion_deadlock_detect(r, lkb)) {
1757                if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1758                        lkb->lkb_grmode = DLM_LOCK_NL;
1759                        lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1760                } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1761                        if (err)
1762                                *err = -EDEADLK;
1763                        else {
1764                                log_print("can_be_granted deadlock %x now %d",
1765                                          lkb->lkb_id, now);
1766                                dlm_dump_rsb(r);
1767                        }
1768                }
1769                goto out;
1770        }
1771
1772        /*
1773         * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1774         * to grant a request in a mode other than the normal rqmode.  It's a
1775         * simple way to provide a big optimization to applications that can
1776         * use them.
1777         */
1778
1779        if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1780                alt = DLM_LOCK_PR;
1781        else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1782                alt = DLM_LOCK_CW;
1783
1784        if (alt) {
1785                lkb->lkb_rqmode = alt;
1786                rv = _can_be_granted(r, lkb, now);
1787                if (rv)
1788                        lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1789                else
1790                        lkb->lkb_rqmode = rqmode;
1791        }
1792 out:
1793        return rv;
1794}
1795
1796/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1797   for locks pending on the convert list.  Once verified (watch for these
1798   log_prints), we should be able to just call _can_be_granted() and not
1799   bother with the demote/deadlk cases here (and there's no easy way to deal
1800   with a deadlk here, we'd have to generate something like grant_lock with
1801   the deadlk error.) */
1802
1803/* Returns the highest requested mode of all blocked conversions; sets
1804   cw if there's a blocked conversion to DLM_LOCK_CW. */
1805
1806static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1807{
1808        struct dlm_lkb *lkb, *s;
1809        int hi, demoted, quit, grant_restart, demote_restart;
1810        int deadlk;
1811
1812        quit = 0;
1813 restart:
1814        grant_restart = 0;
1815        demote_restart = 0;
1816        hi = DLM_LOCK_IV;
1817
1818        list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1819                demoted = is_demoted(lkb);
1820                deadlk = 0;
1821
1822                if (can_be_granted(r, lkb, 0, &deadlk)) {
1823                        grant_lock_pending(r, lkb);
1824                        grant_restart = 1;
1825                        continue;
1826                }
1827
1828                if (!demoted && is_demoted(lkb)) {
1829                        log_print("WARN: pending demoted %x node %d %s",
1830                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1831                        demote_restart = 1;
1832                        continue;
1833                }
1834
1835                if (deadlk) {
1836                        log_print("WARN: pending deadlock %x node %d %s",
1837                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1838                        dlm_dump_rsb(r);
1839                        continue;
1840                }
1841
1842                hi = max_t(int, lkb->lkb_rqmode, hi);
1843
1844                if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1845                        *cw = 1;
1846        }
1847
1848        if (grant_restart)
1849                goto restart;
1850        if (demote_restart && !quit) {
1851                quit = 1;
1852                goto restart;
1853        }
1854
1855        return max_t(int, high, hi);
1856}
1857
1858static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1859{
1860        struct dlm_lkb *lkb, *s;
1861
1862        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1863                if (can_be_granted(r, lkb, 0, NULL))
1864                        grant_lock_pending(r, lkb);
1865                else {
1866                        high = max_t(int, lkb->lkb_rqmode, high);
1867                        if (lkb->lkb_rqmode == DLM_LOCK_CW)
1868                                *cw = 1;
1869                }
1870        }
1871
1872        return high;
1873}
1874
1875/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1876   on either the convert or waiting queue.
1877   high is the largest rqmode of all locks blocked on the convert or
1878   waiting queue. */
1879
1880static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1881{
1882        if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1883                if (gr->lkb_highbast < DLM_LOCK_EX)
1884                        return 1;
1885                return 0;
1886        }
1887
1888        if (gr->lkb_highbast < high &&
1889            !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1890                return 1;
1891        return 0;
1892}
1893
1894static void grant_pending_locks(struct dlm_rsb *r)
1895{
1896        struct dlm_lkb *lkb, *s;
1897        int high = DLM_LOCK_IV;
1898        int cw = 0;
1899
1900        DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1901
1902        high = grant_pending_convert(r, high, &cw);
1903        high = grant_pending_wait(r, high, &cw);
1904
1905        if (high == DLM_LOCK_IV)
1906                return;
1907
1908        /*
1909         * If there are locks left on the wait/convert queue then send blocking
1910         * ASTs to granted locks based on the largest requested mode (high)
1911         * found above.
1912         */
1913
1914        list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1915                if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1916                        if (cw && high == DLM_LOCK_PR &&
1917                            lkb->lkb_grmode == DLM_LOCK_PR)
1918                                queue_bast(r, lkb, DLM_LOCK_CW);
1919                        else
1920                                queue_bast(r, lkb, high);
1921                        lkb->lkb_highbast = high;
1922                }
1923        }
1924}
1925
1926static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1927{
1928        if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1929            (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1930                if (gr->lkb_highbast < DLM_LOCK_EX)
1931                        return 1;
1932                return 0;
1933        }
1934
1935        if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1936                return 1;
1937        return 0;
1938}
1939
1940static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1941                            struct dlm_lkb *lkb)
1942{
1943        struct dlm_lkb *gr;
1944
1945        list_for_each_entry(gr, head, lkb_statequeue) {
1946                /* skip self when sending basts to convertqueue */
1947                if (gr == lkb)
1948                        continue;
1949                if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1950                        queue_bast(r, gr, lkb->lkb_rqmode);
1951                        gr->lkb_highbast = lkb->lkb_rqmode;
1952                }
1953        }
1954}
1955
1956static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1957{
1958        send_bast_queue(r, &r->res_grantqueue, lkb);
1959}
1960
1961static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1962{
1963        send_bast_queue(r, &r->res_grantqueue, lkb);
1964        send_bast_queue(r, &r->res_convertqueue, lkb);
1965}
1966
1967/* set_master(r, lkb) -- set the master nodeid of a resource
1968
1969   The purpose of this function is to set the nodeid field in the given
1970   lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1971   known, it can just be copied to the lkb and the function will return
1972   0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1973   before it can be copied to the lkb.
1974
1975   When the rsb nodeid is being looked up remotely, the initial lkb
1976   causing the lookup is kept on the ls_waiters list waiting for the
1977   lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1978   on the rsb's res_lookup list until the master is verified.
1979
1980   Return values:
1981   0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1982   1: the rsb master is not available and the lkb has been placed on
1983      a wait queue
1984*/
1985
1986static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1987{
1988        struct dlm_ls *ls = r->res_ls;
1989        int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1990
1991        if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1992                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1993                r->res_first_lkid = lkb->lkb_id;
1994                lkb->lkb_nodeid = r->res_nodeid;
1995                return 0;
1996        }
1997
1998        if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1999                list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2000                return 1;
2001        }
2002
2003        if (r->res_nodeid == 0) {
2004                lkb->lkb_nodeid = 0;
2005                return 0;
2006        }
2007
2008        if (r->res_nodeid > 0) {
2009                lkb->lkb_nodeid = r->res_nodeid;
2010                return 0;
2011        }
2012
2013        DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2014
2015        dir_nodeid = dlm_dir_nodeid(r);
2016
2017        if (dir_nodeid != our_nodeid) {
2018                r->res_first_lkid = lkb->lkb_id;
2019                send_lookup(r, lkb);
2020                return 1;
2021        }
2022
2023        for (i = 0; i < 2; i++) {
2024                /* It's possible for dlm_scand to remove an old rsb for
2025                   this same resource from the toss list, us to create
2026                   a new one, look up the master locally, and find it
2027                   already exists just before dlm_scand does the
2028                   dir_remove() on the previous rsb. */
2029
2030                error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2031                                       r->res_length, &ret_nodeid);
2032                if (!error)
2033                        break;
2034                log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2035                schedule();
2036        }
2037        if (error && error != -EEXIST)
2038                return error;
2039
2040        if (ret_nodeid == our_nodeid) {
2041                r->res_first_lkid = 0;
2042                r->res_nodeid = 0;
2043                lkb->lkb_nodeid = 0;
2044        } else {
2045                r->res_first_lkid = lkb->lkb_id;
2046                r->res_nodeid = ret_nodeid;
2047                lkb->lkb_nodeid = ret_nodeid;
2048        }
2049        return 0;
2050}
2051
2052static void process_lookup_list(struct dlm_rsb *r)
2053{
2054        struct dlm_lkb *lkb, *safe;
2055
2056        list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2057                list_del_init(&lkb->lkb_rsb_lookup);
2058                _request_lock(r, lkb);
2059                schedule();
2060        }
2061}
2062
2063/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2064
2065static void confirm_master(struct dlm_rsb *r, int error)
2066{
2067        struct dlm_lkb *lkb;
2068
2069        if (!r->res_first_lkid)
2070                return;
2071
2072        switch (error) {
2073        case 0:
2074        case -EINPROGRESS:
2075                r->res_first_lkid = 0;
2076                process_lookup_list(r);
2077                break;
2078
2079        case -EAGAIN:
2080        case -EBADR:
2081        case -ENOTBLK:
2082                /* the remote request failed and won't be retried (it was
2083                   a NOQUEUE, or has been canceled/unlocked); make a waiting
2084                   lkb the first_lkid */
2085
2086                r->res_first_lkid = 0;
2087
2088                if (!list_empty(&r->res_lookup)) {
2089                        lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2090                                         lkb_rsb_lookup);
2091                        list_del_init(&lkb->lkb_rsb_lookup);
2092                        r->res_first_lkid = lkb->lkb_id;
2093                        _request_lock(r, lkb);
2094                }
2095                break;
2096
2097        default:
2098                log_error(r->res_ls, "confirm_master unknown error %d", error);
2099        }
2100}
2101
2102static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2103                         int namelen, unsigned long timeout_cs,
2104                         void (*ast) (void *astparam),
2105                         void *astparam,
2106                         void (*bast) (void *astparam, int mode),
2107                         struct dlm_args *args)
2108{
2109        int rv = -EINVAL;
2110
2111        /* check for invalid arg usage */
2112
2113        if (mode < 0 || mode > DLM_LOCK_EX)
2114                goto out;
2115
2116        if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2117                goto out;
2118
2119        if (flags & DLM_LKF_CANCEL)
2120                goto out;
2121
2122        if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2123                goto out;
2124
2125        if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2126                goto out;
2127
2128        if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2129                goto out;
2130
2131        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2132                goto out;
2133
2134        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2135                goto out;
2136
2137        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2138                goto out;
2139
2140        if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2141                goto out;
2142
2143        if (!ast || !lksb)
2144                goto out;
2145
2146        if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2147                goto out;
2148
2149        if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2150                goto out;
2151
2152        /* these args will be copied to the lkb in validate_lock_args,
2153           it cannot be done now because when converting locks, fields in
2154           an active lkb cannot be modified before locking the rsb */
2155
2156        args->flags = flags;
2157        args->astfn = ast;
2158        args->astparam = astparam;
2159        args->bastfn = bast;
2160        args->timeout = timeout_cs;
2161        args->mode = mode;
2162        args->lksb = lksb;
2163        rv = 0;
2164 out:
2165        return rv;
2166}
2167
2168static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2169{
2170        if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2171                      DLM_LKF_FORCEUNLOCK))
2172                return -EINVAL;
2173
2174        if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2175                return -EINVAL;
2176
2177        args->flags = flags;
2178        args->astparam = astarg;
2179        return 0;
2180}
2181
2182static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2183                              struct dlm_args *args)
2184{
2185        int rv = -EINVAL;
2186
2187        if (args->flags & DLM_LKF_CONVERT) {
2188                if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2189                        goto out;
2190
2191                if (args->flags & DLM_LKF_QUECVT &&
2192                    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2193                        goto out;
2194
2195                rv = -EBUSY;
2196                if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2197                        goto out;
2198
2199                if (lkb->lkb_wait_type)
2200                        goto out;
2201
2202                if (is_overlap(lkb))
2203                        goto out;
2204        }
2205
2206        lkb->lkb_exflags = args->flags;
2207        lkb->lkb_sbflags = 0;
2208        lkb->lkb_astfn = args->astfn;
2209        lkb->lkb_astparam = args->astparam;
2210        lkb->lkb_bastfn = args->bastfn;
2211        lkb->lkb_rqmode = args->mode;
2212        lkb->lkb_lksb = args->lksb;
2213        lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2214        lkb->lkb_ownpid = (int) current->pid;
2215        lkb->lkb_timeout_cs = args->timeout;
2216        rv = 0;
2217 out:
2218        if (rv)
2219                log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2220                          rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2221                          lkb->lkb_status, lkb->lkb_wait_type,
2222                          lkb->lkb_resource->res_name);
2223        return rv;
2224}
2225
2226/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2227   for success */
2228
2229/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2230   because there may be a lookup in progress and it's valid to do
2231   cancel/unlockf on it */
2232
2233static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2234{
2235        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2236        int rv = -EINVAL;
2237
2238        if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2239                log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2240                dlm_print_lkb(lkb);
2241                goto out;
2242        }
2243
2244        /* an lkb may still exist even though the lock is EOL'ed due to a
2245           cancel, unlock or failed noqueue request; an app can't use these
2246           locks; return same error as if the lkid had not been found at all */
2247
2248        if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2249                log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2250                rv = -ENOENT;
2251                goto out;
2252        }
2253
2254        /* an lkb may be waiting for an rsb lookup to complete where the
2255           lookup was initiated by another lock */
2256
2257        if (!list_empty(&lkb->lkb_rsb_lookup)) {
2258                if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2259                        log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2260                        list_del_init(&lkb->lkb_rsb_lookup);
2261                        queue_cast(lkb->lkb_resource, lkb,
2262                                   args->flags & DLM_LKF_CANCEL ?
2263                                   -DLM_ECANCEL : -DLM_EUNLOCK);
2264                        unhold_lkb(lkb); /* undoes create_lkb() */
2265                }
2266                /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2267                rv = -EBUSY;
2268                goto out;
2269        }
2270
2271        /* cancel not allowed with another cancel/unlock in progress */
2272
2273        if (args->flags & DLM_LKF_CANCEL) {
2274                if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2275                        goto out;
2276
2277                if (is_overlap(lkb))
2278                        goto out;
2279
2280                /* don't let scand try to do a cancel */
2281                del_timeout(lkb);
2282
2283                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2284                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2285                        rv = -EBUSY;
2286                        goto out;
2287                }
2288
2289                /* there's nothing to cancel */
2290                if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2291                    !lkb->lkb_wait_type) {
2292                        rv = -EBUSY;
2293                        goto out;
2294                }
2295
2296                switch (lkb->lkb_wait_type) {
2297                case DLM_MSG_LOOKUP:
2298                case DLM_MSG_REQUEST:
2299                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2300                        rv = -EBUSY;
2301                        goto out;
2302                case DLM_MSG_UNLOCK:
2303                case DLM_MSG_CANCEL:
2304                        goto out;
2305                }
2306                /* add_to_waiters() will set OVERLAP_CANCEL */
2307                goto out_ok;
2308        }
2309
2310        /* do we need to allow a force-unlock if there's a normal unlock
2311           already in progress?  in what conditions could the normal unlock
2312           fail such that we'd want to send a force-unlock to be sure? */
2313
2314        if (args->flags & DLM_LKF_FORCEUNLOCK) {
2315                if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2316                        goto out;
2317
2318                if (is_overlap_unlock(lkb))
2319                        goto out;
2320
2321                /* don't let scand try to do a cancel */
2322                del_timeout(lkb);
2323
2324                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2325                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2326                        rv = -EBUSY;
2327                        goto out;
2328                }
2329
2330                switch (lkb->lkb_wait_type) {
2331                case DLM_MSG_LOOKUP:
2332                case DLM_MSG_REQUEST:
2333                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2334                        rv = -EBUSY;
2335                        goto out;
2336                case DLM_MSG_UNLOCK:
2337                        goto out;
2338                }
2339                /* add_to_waiters() will set OVERLAP_UNLOCK */
2340                goto out_ok;
2341        }
2342
2343        /* normal unlock not allowed if there's any op in progress */
2344        rv = -EBUSY;
2345        if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2346                goto out;
2347
2348 out_ok:
2349        /* an overlapping op shouldn't blow away exflags from other op */
2350        lkb->lkb_exflags |= args->flags;
2351        lkb->lkb_sbflags = 0;
2352        lkb->lkb_astparam = args->astparam;
2353        rv = 0;
2354 out:
2355        if (rv)
2356                log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2357                          lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2358                          args->flags, lkb->lkb_wait_type,
2359                          lkb->lkb_resource->res_name);
2360        return rv;
2361}
2362
2363/*
2364 * Four stage 4 varieties:
2365 * do_request(), do_convert(), do_unlock(), do_cancel()
2366 * These are called on the master node for the given lock and
2367 * from the central locking logic.
2368 */
2369
2370static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2371{
2372        int error = 0;
2373
2374        if (can_be_granted(r, lkb, 1, NULL)) {
2375                grant_lock(r, lkb);
2376                queue_cast(r, lkb, 0);
2377                goto out;
2378        }
2379
2380        if (can_be_queued(lkb)) {
2381                error = -EINPROGRESS;
2382                add_lkb(r, lkb, DLM_LKSTS_WAITING);
2383                add_timeout(lkb);
2384                goto out;
2385        }
2386
2387        error = -EAGAIN;
2388        queue_cast(r, lkb, -EAGAIN);
2389 out:
2390        return error;
2391}
2392
2393static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2394                               int error)
2395{
2396        switch (error) {
2397        case -EAGAIN:
2398                if (force_blocking_asts(lkb))
2399                        send_blocking_asts_all(r, lkb);
2400                break;
2401        case -EINPROGRESS:
2402                send_blocking_asts(r, lkb);
2403                break;
2404        }
2405}
2406
2407static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2408{
2409        int error = 0;
2410        int deadlk = 0;
2411
2412        /* changing an existing lock may allow others to be granted */
2413
2414        if (can_be_granted(r, lkb, 1, &deadlk)) {
2415                grant_lock(r, lkb);
2416                queue_cast(r, lkb, 0);
2417                goto out;
2418        }
2419
2420        /* can_be_granted() detected that this lock would block in a conversion
2421           deadlock, so we leave it on the granted queue and return EDEADLK in
2422           the ast for the convert. */
2423
2424        if (deadlk) {
2425                /* it's left on the granted queue */
2426                revert_lock(r, lkb);
2427                queue_cast(r, lkb, -EDEADLK);
2428                error = -EDEADLK;
2429                goto out;
2430        }
2431
2432        /* is_demoted() means the can_be_granted() above set the grmode
2433           to NL, and left us on the granted queue.  This auto-demotion
2434           (due to CONVDEADLK) might mean other locks, and/or this lock, are
2435           now grantable.  We have to try to grant other converting locks
2436           before we try again to grant this one. */
2437
2438        if (is_demoted(lkb)) {
2439                grant_pending_convert(r, DLM_LOCK_IV, NULL);
2440                if (_can_be_granted(r, lkb, 1)) {
2441                        grant_lock(r, lkb);
2442                        queue_cast(r, lkb, 0);
2443                        goto out;
2444                }
2445                /* else fall through and move to convert queue */
2446        }
2447
2448        if (can_be_queued(lkb)) {
2449                error = -EINPROGRESS;
2450                del_lkb(r, lkb);
2451                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2452                add_timeout(lkb);
2453                goto out;
2454        }
2455
2456        error = -EAGAIN;
2457        queue_cast(r, lkb, -EAGAIN);
2458 out:
2459        return error;
2460}
2461
2462static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2463                               int error)
2464{
2465        switch (error) {
2466        case 0:
2467                grant_pending_locks(r);
2468                /* grant_pending_locks also sends basts */
2469                break;
2470        case -EAGAIN:
2471                if (force_blocking_asts(lkb))
2472                        send_blocking_asts_all(r, lkb);
2473                break;
2474        case -EINPROGRESS:
2475                send_blocking_asts(r, lkb);
2476                break;
2477        }
2478}
2479
2480static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2481{
2482        remove_lock(r, lkb);
2483        queue_cast(r, lkb, -DLM_EUNLOCK);
2484        return -DLM_EUNLOCK;
2485}
2486
2487static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2488                              int error)
2489{
2490        grant_pending_locks(r);
2491}
2492
2493/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2494 
2495static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2496{
2497        int error;
2498
2499        error = revert_lock(r, lkb);
2500        if (error) {
2501                queue_cast(r, lkb, -DLM_ECANCEL);
2502                return -DLM_ECANCEL;
2503        }
2504        return 0;
2505}
2506
2507static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2508                              int error)
2509{
2510        if (error)
2511                grant_pending_locks(r);
2512}
2513
2514/*
2515 * Four stage 3 varieties:
2516 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2517 */
2518
2519/* add a new lkb to a possibly new rsb, called by requesting process */
2520
2521static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2522{
2523        int error;
2524
2525        /* set_master: sets lkb nodeid from r */
2526
2527        error = set_master(r, lkb);
2528        if (error < 0)
2529                goto out;
2530        if (error) {
2531                error = 0;
2532                goto out;
2533        }
2534
2535        if (is_remote(r)) {
2536                /* receive_request() calls do_request() on remote node */
2537                error = send_request(r, lkb);
2538        } else {
2539                error = do_request(r, lkb);
2540                /* for remote locks the request_reply is sent
2541                   between do_request and do_request_effects */
2542                do_request_effects(r, lkb, error);
2543        }
2544 out:
2545        return error;
2546}
2547
2548/* change some property of an existing lkb, e.g. mode */
2549
2550static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2551{
2552        int error;
2553
2554        if (is_remote(r)) {
2555                /* receive_convert() calls do_convert() on remote node */
2556                error = send_convert(r, lkb);
2557        } else {
2558                error = do_convert(r, lkb);
2559                /* for remote locks the convert_reply is sent
2560                   between do_convert and do_convert_effects */
2561                do_convert_effects(r, lkb, error);
2562        }
2563
2564        return error;
2565}
2566
2567/* remove an existing lkb from the granted queue */
2568
2569static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2570{
2571        int error;
2572
2573        if (is_remote(r)) {
2574                /* receive_unlock() calls do_unlock() on remote node */
2575                error = send_unlock(r, lkb);
2576        } else {
2577                error = do_unlock(r, lkb);
2578                /* for remote locks the unlock_reply is sent
2579                   between do_unlock and do_unlock_effects */
2580                do_unlock_effects(r, lkb, error);
2581        }
2582
2583        return error;
2584}
2585
2586/* remove an existing lkb from the convert or wait queue */
2587
2588static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2589{
2590        int error;
2591
2592        if (is_remote(r)) {
2593                /* receive_cancel() calls do_cancel() on remote node */
2594                error = send_cancel(r, lkb);
2595        } else {
2596                error = do_cancel(r, lkb);
2597                /* for remote locks the cancel_reply is sent
2598                   between do_cancel and do_cancel_effects */
2599                do_cancel_effects(r, lkb, error);
2600        }
2601
2602        return error;
2603}
2604
2605/*
2606 * Four stage 2 varieties:
2607 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2608 */
2609
2610static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2611                        int len, struct dlm_args *args)
2612{
2613        struct dlm_rsb *r;
2614        int error;
2615
2616        error = validate_lock_args(ls, lkb, args);
2617        if (error)
2618                goto out;
2619
2620        error = find_rsb(ls, name, len, R_CREATE, &r);
2621        if (error)
2622                goto out;
2623
2624        lock_rsb(r);
2625
2626        attach_lkb(r, lkb);
2627        lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2628
2629        error = _request_lock(r, lkb);
2630
2631        unlock_rsb(r);
2632        put_rsb(r);
2633
2634 out:
2635        return error;
2636}
2637
2638static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2639                        struct dlm_args *args)
2640{
2641        struct dlm_rsb *r;
2642        int error;
2643
2644        r = lkb->lkb_resource;
2645
2646        hold_rsb(r);
2647        lock_rsb(r);
2648
2649        error = validate_lock_args(ls, lkb, args);
2650        if (error)
2651                goto out;
2652
2653        error = _convert_lock(r, lkb);
2654 out:
2655        unlock_rsb(r);
2656        put_rsb(r);
2657        return error;
2658}
2659
2660static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2661                       struct dlm_args *args)
2662{
2663        struct dlm_rsb *r;
2664        int error;
2665
2666        r = lkb->lkb_resource;
2667
2668        hold_rsb(r);
2669        lock_rsb(r);
2670
2671        error = validate_unlock_args(lkb, args);
2672        if (error)
2673                goto out;
2674
2675        error = _unlock_lock(r, lkb);
2676 out:
2677        unlock_rsb(r);
2678        put_rsb(r);
2679        return error;
2680}
2681
2682static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2683                       struct dlm_args *args)
2684{
2685        struct dlm_rsb *r;
2686        int error;
2687
2688        r = lkb->lkb_resource;
2689
2690        hold_rsb(r);
2691        lock_rsb(r);
2692
2693        error = validate_unlock_args(lkb, args);
2694        if (error)
2695                goto out;
2696
2697        error = _cancel_lock(r, lkb);
2698 out:
2699        unlock_rsb(r);
2700        put_rsb(r);
2701        return error;
2702}
2703
2704/*
2705 * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2706 */
2707
2708int dlm_lock(dlm_lockspace_t *lockspace,
2709             int mode,
2710             struct dlm_lksb *lksb,
2711             uint32_t flags,
2712             void *name,
2713             unsigned int namelen,
2714             uint32_t parent_lkid,
2715             void (*ast) (void *astarg),
2716             void *astarg,
2717             void (*bast) (void *astarg, int mode))
2718{
2719        struct dlm_ls *ls;
2720        struct dlm_lkb *lkb;
2721        struct dlm_args args;
2722        int error, convert = flags & DLM_LKF_CONVERT;
2723
2724        ls = dlm_find_lockspace_local(lockspace);
2725        if (!ls)
2726                return -EINVAL;
2727
2728        dlm_lock_recovery(ls);
2729
2730        if (convert)
2731                error = find_lkb(ls, lksb->sb_lkid, &lkb);
2732        else
2733                error = create_lkb(ls, &lkb);
2734
2735        if (error)
2736                goto out;
2737
2738        error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2739                              astarg, bast, &args);
2740        if (error)
2741                goto out_put;
2742
2743        if (convert)
2744                error = convert_lock(ls, lkb, &args);
2745        else
2746                error = request_lock(ls, lkb, name, namelen, &args);
2747
2748        if (error == -EINPROGRESS)
2749                error = 0;
2750 out_put:
2751        if (convert || error)
2752                __put_lkb(ls, lkb);
2753        if (error == -EAGAIN || error == -EDEADLK)
2754                error = 0;
2755 out:
2756        dlm_unlock_recovery(ls);
2757        dlm_put_lockspace(ls);
2758        return error;
2759}
2760
2761int dlm_unlock(dlm_lockspace_t *lockspace,
2762               uint32_t lkid,
2763               uint32_t flags,
2764               struct dlm_lksb *lksb,
2765               void *astarg)
2766{
2767        struct dlm_ls *ls;
2768        struct dlm_lkb *lkb;
2769        struct dlm_args args;
2770        int error;
2771
2772        ls = dlm_find_lockspace_local(lockspace);
2773        if (!ls)
2774                return -EINVAL;
2775
2776        dlm_lock_recovery(ls);
2777
2778        error = find_lkb(ls, lkid, &lkb);
2779        if (error)
2780                goto out;
2781
2782        error = set_unlock_args(flags, astarg, &args);
2783        if (error)
2784                goto out_put;
2785
2786        if (flags & DLM_LKF_CANCEL)
2787                error = cancel_lock(ls, lkb, &args);
2788        else
2789                error = unlock_lock(ls, lkb, &args);
2790
2791        if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2792                error = 0;
2793        if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2794                error = 0;
2795 out_put:
2796        dlm_put_lkb(lkb);
2797 out:
2798        dlm_unlock_recovery(ls);
2799        dlm_put_lockspace(ls);
2800        return error;
2801}
2802
2803/*
2804 * send/receive routines for remote operations and replies
2805 *
2806 * send_args
2807 * send_common
2808 * send_request                 receive_request
2809 * send_convert                 receive_convert
2810 * send_unlock                  receive_unlock
2811 * send_cancel                  receive_cancel
2812 * send_grant                   receive_grant
2813 * send_bast                    receive_bast
2814 * send_lookup                  receive_lookup
2815 * send_remove                  receive_remove
2816 *
2817 *                              send_common_reply
2818 * receive_request_reply        send_request_reply
2819 * receive_convert_reply        send_convert_reply
2820 * receive_unlock_reply         send_unlock_reply
2821 * receive_cancel_reply         send_cancel_reply
2822 * receive_lookup_reply         send_lookup_reply
2823 */
2824
2825static int _create_message(struct dlm_ls *ls, int mb_len,
2826                           int to_nodeid, int mstype,
2827                           struct dlm_message **ms_ret,
2828                           struct dlm_mhandle **mh_ret)
2829{
2830        struct dlm_message *ms;
2831        struct dlm_mhandle *mh;
2832        char *mb;
2833
2834        /* get_buffer gives us a message handle (mh) that we need to
2835           pass into lowcomms_commit and a message buffer (mb) that we
2836           write our data into */
2837
2838        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2839        if (!mh)
2840                return -ENOBUFS;
2841
2842        memset(mb, 0, mb_len);
2843
2844        ms = (struct dlm_message *) mb;
2845
2846        ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2847        ms->m_header.h_lockspace = ls->ls_global_id;
2848        ms->m_header.h_nodeid = dlm_our_nodeid();
2849        ms->m_header.h_length = mb_len;
2850        ms->m_header.h_cmd = DLM_MSG;
2851
2852        ms->m_type = mstype;
2853
2854        *mh_ret = mh;
2855        *ms_ret = ms;
2856        return 0;
2857}
2858
2859static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2860                          int to_nodeid, int mstype,
2861                          struct dlm_message **ms_ret,
2862                          struct dlm_mhandle **mh_ret)
2863{
2864        int mb_len = sizeof(struct dlm_message);
2865
2866        switch (mstype) {
2867        case DLM_MSG_REQUEST:
2868        case DLM_MSG_LOOKUP:
2869        case DLM_MSG_REMOVE:
2870                mb_len += r->res_length;
2871                break;
2872        case DLM_MSG_CONVERT:
2873        case DLM_MSG_UNLOCK:
2874        case DLM_MSG_REQUEST_REPLY:
2875        case DLM_MSG_CONVERT_REPLY:
2876        case DLM_MSG_GRANT:
2877                if (lkb && lkb->lkb_lvbptr)
2878                        mb_len += r->res_ls->ls_lvblen;
2879                break;
2880        }
2881
2882        return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2883                               ms_ret, mh_ret);
2884}
2885
2886/* further lowcomms enhancements or alternate implementations may make
2887   the return value from this function useful at some point */
2888
2889static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2890{
2891        dlm_message_out(ms);
2892        dlm_lowcomms_commit_buffer(mh);
2893        return 0;
2894}
2895
2896static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2897                      struct dlm_message *ms)
2898{
2899        ms->m_nodeid   = lkb->lkb_nodeid;
2900        ms->m_pid      = lkb->lkb_ownpid;
2901        ms->m_lkid     = lkb->lkb_id;
2902        ms->m_remid    = lkb->lkb_remid;
2903        ms->m_exflags  = lkb->lkb_exflags;
2904        ms->m_sbflags  = lkb->lkb_sbflags;
2905        ms->m_flags    = lkb->lkb_flags;
2906        ms->m_lvbseq   = lkb->lkb_lvbseq;
2907        ms->m_status   = lkb->lkb_status;
2908        ms->m_grmode   = lkb->lkb_grmode;
2909        ms->m_rqmode   = lkb->lkb_rqmode;
2910        ms->m_hash     = r->res_hash;
2911
2912        /* m_result and m_bastmode are set from function args,
2913           not from lkb fields */
2914
2915        if (lkb->lkb_bastfn)
2916                ms->m_asts |= DLM_CB_BAST;
2917        if (lkb->lkb_astfn)
2918                ms->m_asts |= DLM_CB_CAST;
2919
2920        /* compare with switch in create_message; send_remove() doesn't
2921           use send_args() */
2922
2923        switch (ms->m_type) {
2924        case DLM_MSG_REQUEST:
2925        case DLM_MSG_LOOKUP:
2926                memcpy(ms->m_extra, r->res_name, r->res_length);
2927                break;
2928        case DLM_MSG_CONVERT:
2929        case DLM_MSG_UNLOCK:
2930        case DLM_MSG_REQUEST_REPLY:
2931        case DLM_MSG_CONVERT_REPLY:
2932        case DLM_MSG_GRANT:
2933                if (!lkb->lkb_lvbptr)
2934                        break;
2935                memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2936                break;
2937        }
2938}
2939
2940static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2941{
2942        struct dlm_message *ms;
2943        struct dlm_mhandle *mh;
2944        int to_nodeid, error;
2945
2946        to_nodeid = r->res_nodeid;
2947
2948        error = add_to_waiters(lkb, mstype, to_nodeid);
2949        if (error)
2950                return error;
2951
2952        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2953        if (error)
2954                goto fail;
2955
2956        send_args(r, lkb, ms);
2957
2958        error = send_message(mh, ms);
2959        if (error)
2960                goto fail;
2961        return 0;
2962
2963 fail:
2964        remove_from_waiters(lkb, msg_reply_type(mstype));
2965        return error;
2966}
2967
2968static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2969{
2970        return send_common(r, lkb, DLM_MSG_REQUEST);
2971}
2972
2973static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2974{
2975        int error;
2976
2977        error = send_common(r, lkb, DLM_MSG_CONVERT);
2978
2979        /* down conversions go without a reply from the master */
2980        if (!error && down_conversion(lkb)) {
2981                remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2982                r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2983                r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2984                r->res_ls->ls_stub_ms.m_result = 0;
2985                __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2986        }
2987
2988        return error;
2989}
2990
2991/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2992   MASTER_UNCERTAIN to force the next request on the rsb to confirm
2993   that the master is still correct. */
2994
2995static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2996{
2997        return send_common(r, lkb, DLM_MSG_UNLOCK);
2998}
2999
3000static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3001{
3002        return send_common(r, lkb, DLM_MSG_CANCEL);
3003}
3004
3005static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3006{
3007        struct dlm_message *ms;
3008        struct dlm_mhandle *mh;
3009        int to_nodeid, error;
3010
3011        to_nodeid = lkb->lkb_nodeid;
3012
3013        error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3014        if (error)
3015                goto out;
3016
3017        send_args(r, lkb, ms);
3018
3019        ms->m_result = 0;
3020
3021        error = send_message(mh, ms);
3022 out:
3023        return error;
3024}
3025
3026static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3027{
3028        struct dlm_message *ms;
3029        struct dlm_mhandle *mh;
3030        int to_nodeid, error;
3031
3032        to_nodeid = lkb->lkb_nodeid;
3033
3034        error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3035        if (error)
3036                goto out;
3037
3038        send_args(r, lkb, ms);
3039
3040        ms->m_bastmode = mode;
3041
3042        error = send_message(mh, ms);
3043 out:
3044        return error;
3045}
3046
3047static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3048{
3049        struct dlm_message *ms;
3050        struct dlm_mhandle *mh;
3051        int to_nodeid, error;
3052
3053        to_nodeid = dlm_dir_nodeid(r);
3054
3055        error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3056        if (error)
3057                return error;
3058
3059        error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3060        if (error)
3061                goto fail;
3062
3063        send_args(r, lkb, ms);
3064
3065        error = send_message(mh, ms);
3066        if (error)
3067                goto fail;
3068        return 0;
3069
3070 fail:
3071        remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3072        return error;
3073}
3074
3075static int send_remove(struct dlm_rsb *r)
3076{
3077        struct dlm_message *ms;
3078        struct dlm_mhandle *mh;
3079        int to_nodeid, error;
3080
3081        to_nodeid = dlm_dir_nodeid(r);
3082
3083        error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3084        if (error)
3085                goto out;
3086
3087        memcpy(ms->m_extra, r->res_name, r->res_length);
3088        ms->m_hash = r->res_hash;
3089
3090        error = send_message(mh, ms);
3091 out:
3092        return error;
3093}
3094
3095static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3096                             int mstype, int rv)
3097{
3098        struct dlm_message *ms;
3099        struct dlm_mhandle *mh;
3100        int to_nodeid, error;
3101
3102        to_nodeid = lkb->lkb_nodeid;
3103
3104        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3105        if (error)
3106                goto out;
3107
3108        send_args(r, lkb, ms);
3109
3110        ms->m_result = rv;
3111
3112        error = send_message(mh, ms);
3113 out:
3114        return error;
3115}
3116
3117static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3118{
3119        return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3120}
3121
3122static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3123{
3124        return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3125}
3126
3127static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3128{
3129        return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3130}
3131
3132static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3133{
3134        return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3135}
3136
3137static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3138                             int ret_nodeid, int rv)
3139{
3140        struct dlm_rsb *r = &ls->ls_stub_rsb;
3141        struct dlm_message *ms;
3142        struct dlm_mhandle *mh;
3143        int error, nodeid = ms_in->m_header.h_nodeid;
3144
3145        error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3146        if (error)
3147                goto out;
3148
3149        ms->m_lkid = ms_in->m_lkid;
3150        ms->m_result = rv;
3151        ms->m_nodeid = ret_nodeid;
3152
3153        error = send_message(mh, ms);
3154 out:
3155        return error;
3156}
3157
3158/* which args we save from a received message depends heavily on the type
3159   of message, unlike the send side where we can safely send everything about
3160   the lkb for any type of message */
3161
3162static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3163{
3164        lkb->lkb_exflags = ms->m_exflags;
3165        lkb->lkb_sbflags = ms->m_sbflags;
3166        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3167                         (ms->m_flags & 0x0000FFFF);
3168}
3169
3170static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3171{
3172        if (ms->m_flags == DLM_IFL_STUB_MS)
3173                return;
3174
3175        lkb->lkb_sbflags = ms->m_sbflags;
3176        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3177                         (ms->m_flags & 0x0000FFFF);
3178}
3179
3180static int receive_extralen(struct dlm_message *ms)
3181{
3182        return (ms->m_header.h_length - sizeof(struct dlm_message));
3183}
3184
3185static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3186                       struct dlm_message *ms)
3187{
3188        int len;
3189
3190        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3191                if (!lkb->lkb_lvbptr)
3192                        lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3193                if (!lkb->lkb_lvbptr)
3194                        return -ENOMEM;
3195                len = receive_extralen(ms);
3196                if (len > DLM_RESNAME_MAXLEN)
3197                        len = DLM_RESNAME_MAXLEN;
3198                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3199        }
3200        return 0;
3201}
3202
3203static void fake_bastfn(void *astparam, int mode)
3204{
3205        log_print("fake_bastfn should not be called");
3206}
3207
3208static void fake_astfn(void *astparam)
3209{
3210        log_print("fake_astfn should not be called");
3211}
3212
3213static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3214                                struct dlm_message *ms)
3215{
3216        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3217        lkb->lkb_ownpid = ms->m_pid;
3218        lkb->lkb_remid = ms->m_lkid;
3219        lkb->lkb_grmode = DLM_LOCK_IV;
3220        lkb->lkb_rqmode = ms->m_rqmode;
3221
3222        lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3223        lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3224
3225        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3226                /* lkb was just created so there won't be an lvb yet */
3227                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3228                if (!lkb->lkb_lvbptr)
3229                        return -ENOMEM;
3230        }
3231
3232        return 0;
3233}
3234
3235static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3236                                struct dlm_message *ms)
3237{
3238        if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3239                return -EBUSY;
3240
3241        if (receive_lvb(ls, lkb, ms))
3242                return -ENOMEM;
3243
3244        lkb->lkb_rqmode = ms->m_rqmode;
3245        lkb->lkb_lvbseq = ms->m_lvbseq;
3246
3247        return 0;
3248}
3249
3250static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3251                               struct dlm_message *ms)
3252{
3253        if (receive_lvb(ls, lkb, ms))
3254                return -ENOMEM;
3255        return 0;
3256}
3257
3258/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3259   uses to send a reply and that the remote end uses to process the reply. */
3260
3261static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3262{
3263        struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3264        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3265        lkb->lkb_remid = ms->m_lkid;
3266}
3267
3268/* This is called after the rsb is locked so that we can safely inspect
3269   fields in the lkb. */
3270
3271static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3272{
3273        int from = ms->m_header.h_nodeid;
3274        int error = 0;
3275
3276        switch (ms->m_type) {
3277        case DLM_MSG_CONVERT:
3278        case DLM_MSG_UNLOCK:
3279        case DLM_MSG_CANCEL:
3280                if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3281                        error = -EINVAL;
3282                break;
3283
3284        case DLM_MSG_CONVERT_REPLY:
3285        case DLM_MSG_UNLOCK_REPLY:
3286        case DLM_MSG_CANCEL_REPLY:
3287        case DLM_MSG_GRANT:
3288        case DLM_MSG_BAST:
3289                if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3290                        error = -EINVAL;
3291                break;
3292
3293        case DLM_MSG_REQUEST_REPLY:
3294                if (!is_process_copy(lkb))
3295                        error = -EINVAL;
3296                else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3297                        error = -EINVAL;
3298                break;
3299
3300        default:
3301                error = -EINVAL;
3302        }
3303
3304        if (error)
3305                log_error(lkb->lkb_resource->res_ls,
3306                          "ignore invalid message %d from %d %x %x %x %d",
3307                          ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3308                          lkb->lkb_flags, lkb->lkb_nodeid);
3309        return error;
3310}
3311
3312static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3313{
3314        struct dlm_lkb *lkb;
3315        struct dlm_rsb *r;
3316        int error, namelen;
3317
3318        error = create_lkb(ls, &lkb);
3319        if (error)
3320                goto fail;
3321
3322        receive_flags(lkb, ms);
3323        lkb->lkb_flags |= DLM_IFL_MSTCPY;
3324        error = receive_request_args(ls, lkb, ms);
3325        if (error) {
3326                __put_lkb(ls, lkb);
3327                goto fail;
3328        }
3329
3330        namelen = receive_extralen(ms);
3331
3332        error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3333        if (error) {
3334                __put_lkb(ls, lkb);
3335                goto fail;
3336        }
3337
3338        lock_rsb(r);
3339
3340        attach_lkb(r, lkb);
3341        error = do_request(r, lkb);
3342        send_request_reply(r, lkb, error);
3343        do_request_effects(r, lkb, error);
3344
3345        unlock_rsb(r);
3346        put_rsb(r);
3347
3348        if (error == -EINPROGRESS)
3349                error = 0;
3350        if (error)
3351                dlm_put_lkb(lkb);
3352        return;
3353
3354 fail:
3355        setup_stub_lkb(ls, ms);
3356        send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3357}
3358
3359static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3360{
3361        struct dlm_lkb *lkb;
3362        struct dlm_rsb *r;
3363        int error, reply = 1;
3364
3365        error = find_lkb(ls, ms->m_remid, &lkb);
3366        if (error)
3367                goto fail;
3368
3369        r = lkb->lkb_resource;
3370
3371        hold_rsb(r);
3372        lock_rsb(r);
3373
3374        error = validate_message(lkb, ms);
3375        if (error)
3376                goto out;
3377
3378        receive_flags(lkb, ms);
3379
3380        error = receive_convert_args(ls, lkb, ms);
3381        if (error) {
3382                send_convert_reply(r, lkb, error);
3383                goto out;
3384        }
3385
3386        reply = !down_conversion(lkb);
3387
3388        error = do_convert(r, lkb);
3389        if (reply)
3390                send_convert_reply(r, lkb, error);
3391        do_convert_effects(r, lkb, error);
3392 out:
3393        unlock_rsb(r);
3394        put_rsb(r);
3395        dlm_put_lkb(lkb);
3396        return;
3397
3398 fail:
3399        setup_stub_lkb(ls, ms);
3400        send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3401}
3402
3403static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3404{
3405        struct dlm_lkb *lkb;
3406        struct dlm_rsb *r;
3407        int error;
3408
3409        error = find_lkb(ls, ms->m_remid, &lkb);
3410        if (error)
3411                goto fail;
3412
3413        r = lkb->lkb_resource;
3414
3415        hold_rsb(r);
3416        lock_rsb(r);
3417
3418        error = validate_message(lkb, ms);
3419        if (error)
3420                goto out;
3421
3422        receive_flags(lkb, ms);
3423
3424        error = receive_unlock_args(ls, lkb, ms);
3425        if (error) {
3426                send_unlock_reply(r, lkb, error);
3427                goto out;
3428        }
3429
3430        error = do_unlock(r, lkb);
3431        send_unlock_reply(r, lkb, error);
3432        do_unlock_effects(r, lkb, error);
3433 out:
3434        unlock_rsb(r);
3435        put_rsb(r);
3436        dlm_put_lkb(lkb);
3437        return;
3438
3439 fail:
3440        setup_stub_lkb(ls, ms);
3441        send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3442}
3443
3444static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3445{
3446        struct dlm_lkb *lkb;
3447        struct dlm_rsb *r;
3448        int error;
3449
3450        error = find_lkb(ls, ms->m_remid, &lkb);
3451        if (error)
3452                goto fail;
3453
3454        receive_flags(lkb, ms);
3455
3456        r = lkb->lkb_resource;
3457
3458        hold_rsb(r);
3459        lock_rsb(r);
3460
3461        error = validate_message(lkb, ms);
3462        if (error)
3463                goto out;
3464
3465        error = do_cancel(r, lkb);
3466        send_cancel_reply(r, lkb, error);
3467        do_cancel_effects(r, lkb, error);
3468 out:
3469        unlock_rsb(r);
3470        put_rsb(r);
3471        dlm_put_lkb(lkb);
3472        return;
3473
3474 fail:
3475        setup_stub_lkb(ls, ms);
3476        send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3477}
3478
3479static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3480{
3481        struct dlm_lkb *lkb;
3482        struct dlm_rsb *r;
3483        int error;
3484
3485        error = find_lkb(ls, ms->m_remid, &lkb);
3486        if (error) {
3487                log_debug(ls, "receive_grant from %d no lkb %x",
3488                          ms->m_header.h_nodeid, ms->m_remid);
3489                return;
3490        }
3491
3492        r = lkb->lkb_resource;
3493
3494        hold_rsb(r);
3495        lock_rsb(r);
3496
3497        error = validate_message(lkb, ms);
3498        if (error)
3499                goto out;
3500
3501        receive_flags_reply(lkb, ms);
3502        if (is_altmode(lkb))
3503                munge_altmode(lkb, ms);
3504        grant_lock_pc(r, lkb, ms);
3505        queue_cast(r, lkb, 0);
3506 out:
3507        unlock_rsb(r);
3508        put_rsb(r);
3509        dlm_put_lkb(lkb);
3510}
3511
3512static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3513{
3514        struct dlm_lkb *lkb;
3515        struct dlm_rsb *r;
3516        int error;
3517
3518        error = find_lkb(ls, ms->m_remid, &lkb);
3519        if (error) {
3520                log_debug(ls, "receive_bast from %d no lkb %x",
3521                          ms->m_header.h_nodeid, ms->m_remid);
3522                return;
3523        }
3524
3525        r = lkb->lkb_resource;
3526
3527        hold_rsb(r);
3528        lock_rsb(r);
3529
3530        error = validate_message(lkb, ms);
3531        if (error)
3532                goto out;
3533
3534        queue_bast(r, lkb, ms->m_bastmode);
3535 out:
3536        unlock_rsb(r);
3537        put_rsb(r);
3538        dlm_put_lkb(lkb);
3539}
3540
3541static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3542{
3543        int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3544
3545        from_nodeid = ms->m_header.h_nodeid;
3546        our_nodeid = dlm_our_nodeid();
3547
3548        len = receive_extralen(ms);
3549
3550        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3551        if (dir_nodeid != our_nodeid) {
3552                log_error(ls, "lookup dir_nodeid %d from %d",
3553                          dir_nodeid, from_nodeid);
3554                error = -EINVAL;
3555                ret_nodeid = -1;
3556                goto out;
3557        }
3558
3559        error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3560
3561        /* Optimization: we're master so treat lookup as a request */
3562        if (!error && ret_nodeid == our_nodeid) {
3563                receive_request(ls, ms);
3564                return;
3565        }
3566 out:
3567        send_lookup_reply(ls, ms, ret_nodeid, error);
3568}
3569
3570static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3571{
3572        int len, dir_nodeid, from_nodeid;
3573
3574        from_nodeid = ms->m_header.h_nodeid;
3575
3576        len = receive_extralen(ms);
3577
3578        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3579        if (dir_nodeid != dlm_our_nodeid()) {
3580                log_error(ls, "remove dir entry dir_nodeid %d from %d",
3581                          dir_nodeid, from_nodeid);
3582                return;
3583        }
3584
3585        dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3586}
3587
3588static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3589{
3590        do_purge(ls, ms->m_nodeid, ms->m_pid);
3591}
3592
3593static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3594{
3595        struct dlm_lkb *lkb;
3596        struct dlm_rsb *r;
3597        int error, mstype, result;
3598
3599        error = find_lkb(ls, ms->m_remid, &lkb);
3600        if (error) {
3601                log_debug(ls, "receive_request_reply from %d no lkb %x",
3602                          ms->m_header.h_nodeid, ms->m_remid);
3603                return;
3604        }
3605
3606        r = lkb->lkb_resource;
3607        hold_rsb(r);
3608        lock_rsb(r);
3609
3610        error = validate_message(lkb, ms);
3611        if (error)
3612                goto out;
3613
3614        mstype = lkb->lkb_wait_type;
3615        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3616        if (error)
3617                goto out;
3618
3619        /* Optimization: the dir node was also the master, so it took our
3620           lookup as a request and sent request reply instead of lookup reply */
3621        if (mstype == DLM_MSG_LOOKUP) {
3622                r->res_nodeid = ms->m_header.h_nodeid;
3623                lkb->lkb_nodeid = r->res_nodeid;
3624        }
3625
3626        /* this is the value returned from do_request() on the master */
3627        result = ms->m_result;
3628
3629        switch (result) {
3630        case -EAGAIN:
3631                /* request would block (be queued) on remote master */
3632                queue_cast(r, lkb, -EAGAIN);
3633                confirm_master(r, -EAGAIN);
3634                unhold_lkb(lkb); /* undoes create_lkb() */
3635                break;
3636
3637        case -EINPROGRESS:
3638        case 0:
3639                /* request was queued or granted on remote master */
3640                receive_flags_reply(lkb, ms);
3641                lkb->lkb_remid = ms->m_lkid;
3642                if (is_altmode(lkb))
3643                        munge_altmode(lkb, ms);
3644                if (result) {
3645                        add_lkb(r, lkb, DLM_LKSTS_WAITING);
3646                        add_timeout(lkb);
3647                } else {
3648                        grant_lock_pc(r, lkb, ms);
3649                        queue_cast(r, lkb, 0);
3650                }
3651                confirm_master(r, result);
3652                break;
3653
3654        case -EBADR:
3655        case -ENOTBLK:
3656                /* find_rsb failed to find rsb or rsb wasn't master */
3657                log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3658                          lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3659                r->res_nodeid = -1;
3660                lkb->lkb_nodeid = -1;
3661
3662                if (is_overlap(lkb)) {
3663                        /* we'll ignore error in cancel/unlock reply */
3664                        queue_cast_overlap(r, lkb);
3665                        confirm_master(r, result);
3666                        unhold_lkb(lkb); /* undoes create_lkb() */
3667                } else
3668                        _request_lock(r, lkb);
3669                break;
3670
3671        default:
3672                log_error(ls, "receive_request_reply %x error %d",
3673                          lkb->lkb_id, result);
3674        }
3675
3676        if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3677                log_debug(ls, "receive_request_reply %x result %d unlock",
3678                          lkb->lkb_id, result);
3679                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3680                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3681                send_unlock(r, lkb);
3682        } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3683                log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3684                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3685                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3686                send_cancel(r, lkb);
3687        } else {
3688                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3689                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3690        }
3691 out:
3692        unlock_rsb(r);
3693        put_rsb(r);
3694        dlm_put_lkb(lkb);
3695}
3696
3697static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3698                                    struct dlm_message *ms)
3699{
3700        /* this is the value returned from do_convert() on the master */
3701        switch (ms->m_result) {
3702        case -EAGAIN:
3703                /* convert would block (be queued) on remote master */
3704                queue_cast(r, lkb, -EAGAIN);
3705                break;
3706
3707        case -EDEADLK:
3708                receive_flags_reply(lkb, ms);
3709                revert_lock_pc(r, lkb);
3710                queue_cast(r, lkb, -EDEADLK);
3711                break;
3712
3713        case -EINPROGRESS:
3714                /* convert was queued on remote master */
3715                receive_flags_reply(lkb, ms);
3716                if (is_demoted(lkb))
3717                        munge_demoted(lkb);
3718                del_lkb(r, lkb);
3719                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3720                add_timeout(lkb);
3721                break;
3722
3723        case 0:
3724                /* convert was granted on remote master */
3725                receive_flags_reply(lkb, ms);
3726                if (is_demoted(lkb))
3727                        munge_demoted(lkb);
3728                grant_lock_pc(r, lkb, ms);
3729                queue_cast(r, lkb, 0);
3730                break;
3731
3732        default:
3733                log_error(r->res_ls, "receive_convert_reply %x error %d",
3734                          lkb->lkb_id, ms->m_result);
3735        }
3736}
3737
3738static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3739{
3740        struct dlm_rsb *r = lkb->lkb_resource;
3741        int error;
3742
3743        hold_rsb(r);
3744        lock_rsb(r);
3745
3746        error = validate_message(lkb, ms);
3747        if (error)
3748                goto out;
3749
3750        /* stub reply can happen with waiters_mutex held */
3751        error = remove_from_waiters_ms(lkb, ms);
3752        if (error)
3753                goto out;
3754
3755        __receive_convert_reply(r, lkb, ms);
3756 out:
3757        unlock_rsb(r);
3758        put_rsb(r);
3759}
3760
3761static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3762{
3763        struct dlm_lkb *lkb;
3764        int error;
3765
3766        error = find_lkb(ls, ms->m_remid, &lkb);
3767        if (error) {
3768                log_debug(ls, "receive_convert_reply from %d no lkb %x",
3769                          ms->m_header.h_nodeid, ms->m_remid);
3770                return;
3771        }
3772
3773        _receive_convert_reply(lkb, ms);
3774        dlm_put_lkb(lkb);
3775}
3776
3777static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3778{
3779        struct dlm_rsb *r = lkb->lkb_resource;
3780        int error;
3781
3782        hold_rsb(r);
3783        lock_rsb(r);
3784
3785        error = validate_message(lkb, ms);
3786        if (error)
3787                goto out;
3788
3789        /* stub reply can happen with waiters_mutex held */
3790        error = remove_from_waiters_ms(lkb, ms);
3791        if (error)
3792                goto out;
3793
3794        /* this is the value returned from do_unlock() on the master */
3795
3796        switch (ms->m_result) {
3797        case -DLM_EUNLOCK:
3798                receive_flags_reply(lkb, ms);
3799                remove_lock_pc(r, lkb);
3800                queue_cast(r, lkb, -DLM_EUNLOCK);
3801                break;
3802        case -ENOENT:
3803                break;
3804        default:
3805                log_error(r->res_ls, "receive_unlock_reply %x error %d",
3806                          lkb->lkb_id, ms->m_result);
3807        }
3808 out:
3809        unlock_rsb(r);
3810        put_rsb(r);
3811}
3812
3813static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3814{
3815        struct dlm_lkb *lkb;
3816        int error;
3817
3818        error = find_lkb(ls, ms->m_remid, &lkb);
3819        if (error) {
3820                log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3821                          ms->m_header.h_nodeid, ms->m_remid);
3822                return;
3823        }
3824
3825        _receive_unlock_reply(lkb, ms);
3826        dlm_put_lkb(lkb);
3827}
3828
3829static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3830{
3831        struct dlm_rsb *r = lkb->lkb_resource;
3832        int error;
3833
3834        hold_rsb(r);
3835        lock_rsb(r);
3836
3837        error = validate_message(lkb, ms);
3838        if (error)
3839                goto out;
3840
3841        /* stub reply can happen with waiters_mutex held */
3842        error = remove_from_waiters_ms(lkb, ms);
3843        if (error)
3844                goto out;
3845
3846        /* this is the value returned from do_cancel() on the master */
3847
3848        switch (ms->m_result) {
3849        case -DLM_ECANCEL:
3850                receive_flags_reply(lkb, ms);
3851                revert_lock_pc(r, lkb);
3852                queue_cast(r, lkb, -DLM_ECANCEL);
3853                break;
3854        case 0:
3855                break;
3856        default:
3857                log_error(r->res_ls, "receive_cancel_reply %x error %d",
3858                          lkb->lkb_id, ms->m_result);
3859        }
3860 out:
3861        unlock_rsb(r);
3862        put_rsb(r);
3863}
3864
3865static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3866{
3867        struct dlm_lkb *lkb;
3868        int error;
3869
3870        error = find_lkb(ls, ms->m_remid, &lkb);
3871        if (error) {
3872                log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3873                          ms->m_header.h_nodeid, ms->m_remid);
3874                return;
3875        }
3876
3877        _receive_cancel_reply(lkb, ms);
3878        dlm_put_lkb(lkb);
3879}
3880
3881static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3882{
3883        struct dlm_lkb *lkb;
3884        struct dlm_rsb *r;
3885        int error, ret_nodeid;
3886
3887        error = find_lkb(ls, ms->m_lkid, &lkb);
3888        if (error) {
3889                log_error(ls, "receive_lookup_reply no lkb");
3890                return;
3891        }
3892
3893        /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3894           FIXME: will a non-zero error ever be returned? */
3895
3896        r = lkb->lkb_resource;
3897        hold_rsb(r);
3898        lock_rsb(r);
3899
3900        error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3901        if (error)
3902                goto out;
3903
3904        ret_nodeid = ms->m_nodeid;
3905        if (ret_nodeid == dlm_our_nodeid()) {
3906                r->res_nodeid = 0;
3907                ret_nodeid = 0;
3908                r->res_first_lkid = 0;
3909        } else {
3910                /* set_master() will copy res_nodeid to lkb_nodeid */
3911                r->res_nodeid = ret_nodeid;
3912        }
3913
3914        if (is_overlap(lkb)) {
3915                log_debug(ls, "receive_lookup_reply %x unlock %x",
3916                          lkb->lkb_id, lkb->lkb_flags);
3917                queue_cast_overlap(r, lkb);
3918                unhold_lkb(lkb); /* undoes create_lkb() */
3919                goto out_list;
3920        }
3921
3922        _request_lock(r, lkb);
3923
3924 out_list:
3925        if (!ret_nodeid)
3926                process_lookup_list(r);
3927 out:
3928        unlock_rsb(r);
3929        put_rsb(r);
3930        dlm_put_lkb(lkb);
3931}
3932
3933static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3934{
3935        if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3936                log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3937                          ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3938                          ms->m_remid, ms->m_result);
3939                return;
3940        }
3941
3942        switch (ms->m_type) {
3943
3944        /* messages sent to a master node */
3945
3946        case DLM_MSG_REQUEST:
3947                receive_request(ls, ms);
3948                break;
3949
3950        case DLM_MSG_CONVERT:
3951                receive_convert(ls, ms);
3952                break;
3953
3954        case DLM_MSG_UNLOCK:
3955                receive_unlock(ls, ms);
3956                break;
3957
3958        case DLM_MSG_CANCEL:
3959                receive_cancel(ls, ms);
3960                break;
3961
3962        /* messages sent from a master node (replies to above) */
3963
3964        case DLM_MSG_REQUEST_REPLY:
3965                receive_request_reply(ls, ms);
3966                break;
3967
3968        case DLM_MSG_CONVERT_REPLY:
3969                receive_convert_reply(ls, ms);
3970                break;
3971
3972        case DLM_MSG_UNLOCK_REPLY:
3973                receive_unlock_reply(ls, ms);
3974                break;
3975
3976        case DLM_MSG_CANCEL_REPLY:
3977                receive_cancel_reply(ls, ms);
3978                break;
3979
3980        /* messages sent from a master node (only two types of async msg) */
3981
3982        case DLM_MSG_GRANT:
3983                receive_grant(ls, ms);
3984                break;
3985
3986        case DLM_MSG_BAST:
3987                receive_bast(ls, ms);
3988                break;
3989
3990        /* messages sent to a dir node */
3991
3992        case DLM_MSG_LOOKUP:
3993                receive_lookup(ls, ms);
3994                break;
3995
3996        case DLM_MSG_REMOVE:
3997                receive_remove(ls, ms);
3998                break;
3999
4000        /* messages sent from a dir node (remove has no reply) */
4001
4002        case DLM_MSG_LOOKUP_REPLY:
4003                receive_lookup_reply(ls, ms);
4004                break;
4005
4006        /* other messages */
4007
4008        case DLM_MSG_PURGE:
4009                receive_purge(ls, ms);
4010                break;
4011
4012        default:
4013                log_error(ls, "unknown message type %d", ms->m_type);
4014        }
4015}
4016
4017/* If the lockspace is in recovery mode (locking stopped), then normal
4018   messages are saved on the requestqueue for processing after recovery is
4019   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4020   messages off the requestqueue before we process new ones. This occurs right
4021   after recovery completes when we transition from saving all messages on
4022   requestqueue, to processing all the saved messages, to processing new
4023   messages as they arrive. */
4024
4025static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4026                                int nodeid)
4027{
4028        if (dlm_locking_stopped(ls)) {
4029                dlm_add_requestqueue(ls, nodeid, ms);
4030        } else {
4031                dlm_wait_requestqueue(ls);
4032                _receive_message(ls, ms);
4033        }
4034}
4035
4036/* This is called by dlm_recoverd to process messages that were saved on
4037   the requestqueue. */
4038
4039void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4040{
4041        _receive_message(ls, ms);
4042}
4043
4044/* This is called by the midcomms layer when something is received for
4045   the lockspace.  It could be either a MSG (normal message sent as part of
4046   standard locking activity) or an RCOM (recovery message sent as part of
4047   lockspace recovery). */
4048
4049void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4050{
4051        struct dlm_header *hd = &p->header;
4052        struct dlm_ls *ls;
4053        int type = 0;
4054
4055        switch (hd->h_cmd) {
4056        case DLM_MSG:
4057                dlm_message_in(&p->message);
4058                type = p->message.m_type;
4059                break;
4060        case DLM_RCOM:
4061                dlm_rcom_in(&p->rcom);
4062                type = p->rcom.rc_type;
4063                break;
4064        default:
4065                log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4066                return;
4067        }
4068
4069        if (hd->h_nodeid != nodeid) {
4070                log_print("invalid h_nodeid %d from %d lockspace %x",
4071                          hd->h_nodeid, nodeid, hd->h_lockspace);
4072                return;
4073        }
4074
4075        ls = dlm_find_lockspace_global(hd->h_lockspace);
4076        if (!ls) {
4077                if (dlm_config.ci_log_debug)
4078                        log_print("invalid lockspace %x from %d cmd %d type %d",
4079                                  hd->h_lockspace, nodeid, hd->h_cmd, type);
4080
4081                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4082                        dlm_send_ls_not_ready(nodeid, &p->rcom);
4083                return;
4084        }
4085
4086        /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4087           be inactive (in this ls) before transitioning to recovery mode */
4088
4089        down_read(&ls->ls_recv_active);
4090        if (hd->h_cmd == DLM_MSG)
4091                dlm_receive_message(ls, &p->message, nodeid);
4092        else
4093                dlm_receive_rcom(ls, &p->rcom, nodeid);
4094        up_read(&ls->ls_recv_active);
4095
4096        dlm_put_lockspace(ls);
4097}
4098
4099static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4100                                   struct dlm_message *ms_stub)
4101{
4102        if (middle_conversion(lkb)) {
4103                hold_lkb(lkb);
4104                memset(ms_stub, 0, sizeof(struct dlm_message));
4105                ms_stub->m_flags = DLM_IFL_STUB_MS;
4106                ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4107                ms_stub->m_result = -EINPROGRESS;
4108                ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4109                _receive_convert_reply(lkb, ms_stub);
4110
4111                /* Same special case as in receive_rcom_lock_args() */
4112                lkb->lkb_grmode = DLM_LOCK_IV;
4113                rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4114                unhold_lkb(lkb);
4115
4116        } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4117                lkb->lkb_flags |= DLM_IFL_RESEND;
4118        }
4119
4120        /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4121           conversions are async; there's no reply from the remote master */
4122}
4123
4124/* A waiting lkb needs recovery if the master node has failed, or
4125   the master node is changing (only when no directory is used) */
4126
4127static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4128{
4129        if (dlm_is_removed(ls, lkb->lkb_nodeid))
4130                return 1;
4131
4132        if (!dlm_no_directory(ls))
4133                return 0;
4134
4135        if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4136                return 1;
4137
4138        return 0;
4139}
4140
4141/* Recovery for locks that are waiting for replies from nodes that are now
4142   gone.  We can just complete unlocks and cancels by faking a reply from the
4143   dead node.  Requests and up-conversions we flag to be resent after
4144   recovery.  Down-conversions can just be completed with a fake reply like
4145   unlocks.  Conversions between PR and CW need special attention. */
4146
4147void dlm_recover_waiters_pre(struct dlm_ls *ls)
4148{
4149        struct dlm_lkb *lkb, *safe;
4150        struct dlm_message *ms_stub;
4151        int wait_type, stub_unlock_result, stub_cancel_result;
4152
4153        ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4154        if (!ms_stub) {
4155                log_error(ls, "dlm_recover_waiters_pre no mem");
4156                return;
4157        }
4158
4159        mutex_lock(&ls->ls_waiters_mutex);
4160
4161        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4162
4163                /* exclude debug messages about unlocks because there can be so
4164                   many and they aren't very interesting */
4165
4166                if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4167                        log_debug(ls, "recover_waiter %x nodeid %d "
4168                                  "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4169                                  lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4170                }
4171
4172                /* all outstanding lookups, regardless of destination  will be
4173                   resent after recovery is done */
4174
4175                if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4176                        lkb->lkb_flags |= DLM_IFL_RESEND;
4177                        continue;
4178                }
4179
4180                if (!waiter_needs_recovery(ls, lkb))
4181                        continue;
4182
4183                wait_type = lkb->lkb_wait_type;
4184                stub_unlock_result = -DLM_EUNLOCK;
4185                stub_cancel_result = -DLM_ECANCEL;
4186
4187                /* Main reply may have been received leaving a zero wait_type,
4188                   but a reply for the overlapping op may not have been
4189                   received.  In that case we need to fake the appropriate
4190                   reply for the overlap op. */
4191
4192                if (!wait_type) {
4193                        if (is_overlap_cancel(lkb)) {
4194                                wait_type = DLM_MSG_CANCEL;
4195                                if (lkb->lkb_grmode == DLM_LOCK_IV)
4196                                        stub_cancel_result = 0;
4197                        }
4198                        if (is_overlap_unlock(lkb)) {
4199                                wait_type = DLM_MSG_UNLOCK;
4200                                if (lkb->lkb_grmode == DLM_LOCK_IV)
4201                                        stub_unlock_result = -ENOENT;
4202                        }
4203
4204                        log_debug(ls, "rwpre overlap %x %x %d %d %d",
4205                                  lkb->lkb_id, lkb->lkb_flags, wait_type,
4206                                  stub_cancel_result, stub_unlock_result);
4207                }
4208
4209                switch (wait_type) {
4210
4211                case DLM_MSG_REQUEST:
4212                        lkb->lkb_flags |= DLM_IFL_RESEND;
4213                        break;
4214
4215                case DLM_MSG_CONVERT:
4216                        recover_convert_waiter(ls, lkb, ms_stub);
4217                        break;
4218
4219                case DLM_MSG_UNLOCK:
4220                        hold_lkb(lkb);
4221                        memset(ms_stub, 0, sizeof(struct dlm_message));
4222                        ms_stub->m_flags = DLM_IFL_STUB_MS;
4223                        ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4224                        ms_stub->m_result = stub_unlock_result;
4225                        ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4226                        _receive_unlock_reply(lkb, ms_stub);
4227                        dlm_put_lkb(lkb);
4228                        break;
4229
4230                case DLM_MSG_CANCEL:
4231                        hold_lkb(lkb);
4232                        memset(ms_stub, 0, sizeof(struct dlm_message));
4233                        ms_stub->m_flags = DLM_IFL_STUB_MS;
4234                        ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4235                        ms_stub->m_result = stub_cancel_result;
4236                        ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4237                        _receive_cancel_reply(lkb, ms_stub);
4238                        dlm_put_lkb(lkb);
4239                        break;
4240
4241                default:
4242                        log_error(ls, "invalid lkb wait_type %d %d",
4243                                  lkb->lkb_wait_type, wait_type);
4244                }
4245                schedule();
4246        }
4247        mutex_unlock(&ls->ls_waiters_mutex);
4248        kfree(ms_stub);
4249}
4250
4251static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4252{
4253        struct dlm_lkb *lkb;
4254        int found = 0;
4255
4256        mutex_lock(&ls->ls_waiters_mutex);
4257        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4258                if (lkb->lkb_flags & DLM_IFL_RESEND) {
4259                        hold_lkb(lkb);
4260                        found = 1;
4261                        break;
4262                }
4263        }
4264        mutex_unlock(&ls->ls_waiters_mutex);
4265
4266        if (!found)
4267                lkb = NULL;
4268        return lkb;
4269}
4270
4271/* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4272   master or dir-node for r.  Processing the lkb may result in it being placed
4273   back on waiters. */
4274
4275/* We do this after normal locking has been enabled and any saved messages
4276   (in requestqueue) have been processed.  We should be confident that at
4277   this point we won't get or process a reply to any of these waiting
4278   operations.  But, new ops may be coming in on the rsbs/locks here from
4279   userspace or remotely. */
4280
4281/* there may have been an overlap unlock/cancel prior to recovery or after
4282   recovery.  if before, the lkb may still have a pos wait_count; if after, the
4283   overlap flag would just have been set and nothing new sent.  we can be
4284   confident here than any replies to either the initial op or overlap ops
4285   prior to recovery have been received. */
4286
4287int dlm_recover_waiters_post(struct dlm_ls *ls)
4288{
4289        struct dlm_lkb *lkb;
4290        struct dlm_rsb *r;
4291        int error = 0, mstype, err, oc, ou;
4292
4293        while (1) {
4294                if (dlm_locking_stopped(ls)) {
4295                        log_debug(ls, "recover_waiters_post aborted");
4296                        error = -EINTR;
4297                        break;
4298                }
4299
4300                lkb = find_resend_waiter(ls);
4301                if (!lkb)
4302                        break;
4303
4304                r = lkb->lkb_resource;
4305                hold_rsb(r);
4306                lock_rsb(r);
4307
4308                mstype = lkb->lkb_wait_type;
4309                oc = is_overlap_cancel(lkb);
4310                ou = is_overlap_unlock(lkb);
4311                err = 0;
4312
4313                log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4314                          lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4315
4316                /* At this point we assume that we won't get a reply to any
4317                   previous op or overlap op on this lock.  First, do a big
4318                   remove_from_waiters() for all previous ops. */
4319
4320                lkb->lkb_flags &= ~DLM_IFL_RESEND;
4321                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4322                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4323                lkb->lkb_wait_type = 0;
4324                lkb->lkb_wait_count = 0;
4325                mutex_lock(&ls->ls_waiters_mutex);
4326                list_del_init(&lkb->lkb_wait_reply);
4327                mutex_unlock(&ls->ls_waiters_mutex);
4328                unhold_lkb(lkb); /* for waiters list */
4329
4330                if (oc || ou) {
4331                        /* do an unlock or cancel instead of resending */
4332                        switch (mstype) {
4333                        case DLM_MSG_LOOKUP:
4334                        case DLM_MSG_REQUEST:
4335                                queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4336                                                        -DLM_ECANCEL);
4337                                unhold_lkb(lkb); /* undoes create_lkb() */
4338                                break;
4339                        case DLM_MSG_CONVERT:
4340                                if (oc) {
4341                                        queue_cast(r, lkb, -DLM_ECANCEL);
4342                                } else {
4343                                        lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4344                                        _unlock_lock(r, lkb);
4345                                }
4346                                break;
4347                        default:
4348                                err = 1;
4349                        }
4350                } else {
4351                        switch (mstype) {
4352                        case DLM_MSG_LOOKUP:
4353                        case DLM_MSG_REQUEST:
4354                                _request_lock(r, lkb);
4355                                if (is_master(r))
4356                                        confirm_master(r, 0);
4357                                break;
4358                        case DLM_MSG_CONVERT:
4359                                _convert_lock(r, lkb);
4360                                break;
4361                        default:
4362                                err = 1;
4363                        }
4364                }
4365
4366                if (err)
4367                        log_error(ls, "recover_waiters_post %x %d %x %d %d",
4368                                  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4369                unlock_rsb(r);
4370                put_rsb(r);
4371                dlm_put_lkb(lkb);
4372        }
4373
4374        return error;
4375}
4376
4377static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4378                        int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4379{
4380        struct dlm_ls *ls = r->res_ls;
4381        struct dlm_lkb *lkb, *safe;
4382
4383        list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4384                if (test(ls, lkb)) {
4385                        rsb_set_flag(r, RSB_LOCKS_PURGED);
4386                        del_lkb(r, lkb);
4387                        /* this put should free the lkb */
4388                        if (!dlm_put_lkb(lkb))
4389                                log_error(ls, "purged lkb not released");
4390                }
4391        }
4392}
4393
4394static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4395{
4396        return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4397}
4398
4399static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4400{
4401        return is_master_copy(lkb);
4402}
4403
4404static void purge_dead_locks(struct dlm_rsb *r)
4405{
4406        purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4407        purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4408        purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4409}
4410
4411void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4412{
4413        purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4414        purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4415        purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4416}
4417
4418/* Get rid of locks held by nodes that are gone. */
4419
4420int dlm_purge_locks(struct dlm_ls *ls)
4421{
4422        struct dlm_rsb *r;
4423
4424        log_debug(ls, "dlm_purge_locks");
4425
4426        down_write(&ls->ls_root_sem);
4427        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4428                hold_rsb(r);
4429                lock_rsb(r);
4430                if (is_master(r))
4431                        purge_dead_locks(r);
4432                unlock_rsb(r);
4433                unhold_rsb(r);
4434
4435                schedule();
4436        }
4437        up_write(&ls->ls_root_sem);
4438
4439        return 0;
4440}
4441
4442static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4443{
4444        struct dlm_rsb *r, *r_ret = NULL;
4445
4446        spin_lock(&ls->ls_rsbtbl[bucket].lock);
4447        list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4448                if (!rsb_flag(r, RSB_LOCKS_PURGED))
4449                        continue;
4450                hold_rsb(r);
4451                rsb_clear_flag(r, RSB_LOCKS_PURGED);
4452                r_ret = r;
4453                break;
4454        }
4455        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4456        return r_ret;
4457}
4458
4459void dlm_grant_after_purge(struct dlm_ls *ls)
4460{
4461        struct dlm_rsb *r;
4462        int bucket = 0;
4463
4464        while (1) {
4465                r = find_purged_rsb(ls, bucket);
4466                if (!r) {
4467                        if (bucket == ls->ls_rsbtbl_size - 1)
4468                                break;
4469                        bucket++;
4470                        continue;
4471                }
4472                lock_rsb(r);
4473                if (is_master(r)) {
4474                        grant_pending_locks(r);
4475                        confirm_master(r, 0);
4476                }
4477                unlock_rsb(r);
4478                put_rsb(r);
4479                schedule();
4480        }
4481}
4482
4483static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4484                                         uint32_t remid)
4485{
4486        struct dlm_lkb *lkb;
4487
4488        list_for_each_entry(lkb, head, lkb_statequeue) {
4489                if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4490                        return lkb;
4491        }
4492        return NULL;
4493}
4494
4495static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4496                                    uint32_t remid)
4497{
4498        struct dlm_lkb *lkb;
4499
4500        lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4501        if (lkb)
4502                return lkb;
4503        lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4504        if (lkb)
4505                return lkb;
4506        lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4507        if (lkb)
4508                return lkb;
4509        return NULL;
4510}
4511
4512/* needs at least dlm_rcom + rcom_lock */
4513static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4514                                  struct dlm_rsb *r, struct dlm_rcom *rc)
4515{
4516        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4517
4518        lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4519        lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4520        lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4521        lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4522        lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4523        lkb->lkb_flags |= DLM_IFL_MSTCPY;
4524        lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4525        lkb->lkb_rqmode = rl->rl_rqmode;
4526        lkb->lkb_grmode = rl->rl_grmode;
4527        /* don't set lkb_status because add_lkb wants to itself */
4528
4529        lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4530        lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4531
4532        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4533                int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4534                         sizeof(struct rcom_lock);
4535                if (lvblen > ls->ls_lvblen)
4536                        return -EINVAL;
4537                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4538                if (!lkb->lkb_lvbptr)
4539                        return -ENOMEM;
4540                memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4541        }
4542
4543        /* Conversions between PR and CW (middle modes) need special handling.
4544           The real granted mode of these converting locks cannot be determined
4545           until all locks have been rebuilt on the rsb (recover_conversion) */
4546
4547        if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4548            middle_conversion(lkb)) {
4549                rl->rl_status = DLM_LKSTS_CONVERT;
4550                lkb->lkb_grmode = DLM_LOCK_IV;
4551                rsb_set_flag(r, RSB_RECOVER_CONVERT);
4552        }
4553
4554        return 0;
4555}
4556
4557/* This lkb may have been recovered in a previous aborted recovery so we need
4558   to check if the rsb already has an lkb with the given remote nodeid/lkid.
4559   If so we just send back a standard reply.  If not, we create a new lkb with
4560   the given values and send back our lkid.  We send back our lkid by sending
4561   back the rcom_lock struct we got but with the remid field filled in. */
4562
4563/* needs at least dlm_rcom + rcom_lock */
4564int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4565{
4566        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4567        struct dlm_rsb *r;
4568        struct dlm_lkb *lkb;
4569        int error;
4570
4571        if (rl->rl_parent_lkid) {
4572                error = -EOPNOTSUPP;
4573                goto out;
4574        }
4575
4576        error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4577                         R_MASTER, &r);
4578        if (error)
4579                goto out;
4580
4581        lock_rsb(r);
4582
4583        lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4584        if (lkb) {
4585                error = -EEXIST;
4586                goto out_remid;
4587        }
4588
4589        error = create_lkb(ls, &lkb);
4590        if (error)
4591                goto out_unlock;
4592
4593        error = receive_rcom_lock_args(ls, lkb, r, rc);
4594        if (error) {
4595                __put_lkb(ls, lkb);
4596                goto out_unlock;
4597        }
4598
4599        attach_lkb(r, lkb);
4600        add_lkb(r, lkb, rl->rl_status);
4601        error = 0;
4602
4603 out_remid:
4604        /* this is the new value returned to the lock holder for
4605           saving in its process-copy lkb */
4606        rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4607
4608 out_unlock:
4609        unlock_rsb(r);
4610        put_rsb(r);
4611 out:
4612        if (error)
4613                log_debug(ls, "recover_master_copy %d %x", error,
4614                          le32_to_cpu(rl->rl_lkid));
4615        rl->rl_result = cpu_to_le32(error);
4616        return error;
4617}
4618
4619/* needs at least dlm_rcom + rcom_lock */
4620int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4621{
4622        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4623        struct dlm_rsb *r;
4624        struct dlm_lkb *lkb;
4625        int error;
4626
4627        error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4628        if (error) {
4629                log_error(ls, "recover_process_copy no lkid %x",
4630                                le32_to_cpu(rl->rl_lkid));
4631                return error;
4632        }
4633
4634        DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4635
4636        error = le32_to_cpu(rl->rl_result);
4637
4638        r = lkb->lkb_resource;
4639        hold_rsb(r);
4640        lock_rsb(r);
4641
4642        switch (error) {
4643        case -EBADR:
4644                /* There's a chance the new master received our lock before
4645                   dlm_recover_master_reply(), this wouldn't happen if we did
4646                   a barrier between recover_masters and recover_locks. */
4647                log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4648                          (unsigned long)r, r->res_name);
4649                dlm_send_rcom_lock(r, lkb);
4650                goto out;
4651        case -EEXIST:
4652                log_debug(ls, "master copy exists %x", lkb->lkb_id);
4653                /* fall through */
4654        case 0:
4655                lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4656                break;
4657        default:
4658                log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4659                          error, lkb->lkb_id);
4660        }
4661
4662        /* an ack for dlm_recover_locks() which waits for replies from
4663           all the locks it sends to new masters */
4664        dlm_recovered_lock(r);
4665 out:
4666        unlock_rsb(r);
4667        put_rsb(r);
4668        dlm_put_lkb(lkb);
4669
4670        return 0;
4671}
4672
4673int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4674                     int mode, uint32_t flags, void *name, unsigned int namelen,
4675                     unsigned long timeout_cs)
4676{
4677        struct dlm_lkb *lkb;
4678        struct dlm_args args;
4679        int error;
4680
4681        dlm_lock_recovery(ls);
4682
4683        error = create_lkb(ls, &lkb);
4684        if (error) {
4685                kfree(ua);
4686                goto out;
4687        }
4688
4689        if (flags & DLM_LKF_VALBLK) {
4690                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4691                if (!ua->lksb.sb_lvbptr) {
4692                        kfree(ua);
4693                        __put_lkb(ls, lkb);
4694                        error = -ENOMEM;
4695                        goto out;
4696                }
4697        }
4698
4699        /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4700           When DLM_IFL_USER is set, the dlm knows that this is a userspace
4701           lock and that lkb_astparam is the dlm_user_args structure. */
4702
4703        error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4704                              fake_astfn, ua, fake_bastfn, &args);
4705        lkb->lkb_flags |= DLM_IFL_USER;
4706
4707        if (error) {
4708                __put_lkb(ls, lkb);
4709                goto out;
4710        }
4711
4712        error = request_lock(ls, lkb, name, namelen, &args);
4713
4714        switch (error) {
4715        case 0:
4716                break;
4717        case -EINPROGRESS:
4718                error = 0;
4719                break;
4720        case -EAGAIN:
4721                error = 0;
4722                /* fall through */
4723        default:
4724                __put_lkb(ls, lkb);
4725                goto out;
4726        }
4727
4728        /* add this new lkb to the per-process list of locks */
4729        spin_lock(&ua->proc->locks_spin);
4730        hold_lkb(lkb);
4731        list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4732        spin_unlock(&ua->proc->locks_spin);
4733 out:
4734        dlm_unlock_recovery(ls);
4735        return error;
4736}
4737
4738int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4739                     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4740                     unsigned long timeout_cs)
4741{
4742        struct dlm_lkb *lkb;
4743        struct dlm_args args;
4744        struct dlm_user_args *ua;
4745        int error;
4746
4747        dlm_lock_recovery(ls);
4748
4749        error = find_lkb(ls, lkid, &lkb);
4750        if (error)
4751                goto out;
4752
4753        /* user can change the params on its lock when it converts it, or
4754           add an lvb that didn't exist before */
4755
4756        ua = lkb->lkb_ua;
4757
4758        if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4759                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4760                if (!ua->lksb.sb_lvbptr) {
4761                        error = -ENOMEM;
4762                        goto out_put;
4763                }
4764        }
4765        if (lvb_in && ua->lksb.sb_lvbptr)
4766                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4767
4768        ua->xid = ua_tmp->xid;
4769        ua->castparam = ua_tmp->castparam;
4770        ua->castaddr = ua_tmp->castaddr;
4771        ua->bastparam = ua_tmp->bastparam;
4772        ua->bastaddr = ua_tmp->bastaddr;
4773        ua->user_lksb = ua_tmp->user_lksb;
4774
4775        error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4776                              fake_astfn, ua, fake_bastfn, &args);
4777        if (error)
4778                goto out_put;
4779
4780        error = convert_lock(ls, lkb, &args);
4781
4782        if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4783                error = 0;
4784 out_put:
4785        dlm_put_lkb(lkb);
4786 out:
4787        dlm_unlock_recovery(ls);
4788        kfree(ua_tmp);
4789        return error;
4790}
4791
4792int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4793                    uint32_t flags, uint32_t lkid, char *lvb_in)
4794{
4795        struct dlm_lkb *lkb;
4796        struct dlm_args args;
4797        struct dlm_user_args *ua;
4798        int error;
4799
4800        dlm_lock_recovery(ls);
4801
4802        error = find_lkb(ls, lkid, &lkb);
4803        if (error)
4804                goto out;
4805
4806        ua = lkb->lkb_ua;
4807
4808        if (lvb_in && ua->lksb.sb_lvbptr)
4809                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4810        if (ua_tmp->castparam)
4811                ua->castparam = ua_tmp->castparam;
4812        ua->user_lksb = ua_tmp->user_lksb;
4813
4814        error = set_unlock_args(flags, ua, &args);
4815        if (error)
4816                goto out_put;
4817
4818        error = unlock_lock(ls, lkb, &args);
4819
4820        if (error == -DLM_EUNLOCK)
4821                error = 0;
4822        /* from validate_unlock_args() */
4823        if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4824                error = 0;
4825        if (error)
4826                goto out_put;
4827
4828        spin_lock(&ua->proc->locks_spin);
4829        /* dlm_user_add_cb() may have already taken lkb off the proc list */
4830        if (!list_empty(&lkb->lkb_ownqueue))
4831                list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4832        spin_unlock(&ua->proc->locks_spin);
4833 out_put:
4834        dlm_put_lkb(lkb);
4835 out:
4836        dlm_unlock_recovery(ls);
4837        kfree(ua_tmp);
4838        return error;
4839}
4840
4841int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4842                    uint32_t flags, uint32_t lkid)
4843{
4844        struct dlm_lkb *lkb;
4845        struct dlm_args args;
4846        struct dlm_user_args *ua;
4847        int error;
4848
4849        dlm_lock_recovery(ls);
4850
4851        error = find_lkb(ls, lkid, &lkb);
4852        if (error)
4853                goto out;
4854
4855        ua = lkb->lkb_ua;
4856        if (ua_tmp->castparam)
4857                ua->castparam = ua_tmp->castparam;
4858        ua->user_lksb = ua_tmp->user_lksb;
4859
4860        error = set_unlock_args(flags, ua, &args);
4861        if (error)
4862                goto out_put;
4863
4864        error = cancel_lock(ls, lkb, &args);
4865
4866        if (error == -DLM_ECANCEL)
4867                error = 0;
4868        /* from validate_unlock_args() */
4869        if (error == -EBUSY)
4870                error = 0;
4871 out_put:
4872        dlm_put_lkb(lkb);
4873 out:
4874        dlm_unlock_recovery(ls);
4875        kfree(ua_tmp);
4876        return error;
4877}
4878
4879int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4880{
4881        struct dlm_lkb *lkb;
4882        struct dlm_args args;
4883        struct dlm_user_args *ua;
4884        struct dlm_rsb *r;
4885        int error;
4886
4887        dlm_lock_recovery(ls);
4888
4889        error = find_lkb(ls, lkid, &lkb);
4890        if (error)
4891                goto out;
4892
4893        ua = lkb->lkb_ua;
4894
4895        error = set_unlock_args(flags, ua, &args);
4896        if (error)
4897                goto out_put;
4898
4899        /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4900
4901        r = lkb->lkb_resource;
4902        hold_rsb(r);
4903        lock_rsb(r);
4904
4905        error = validate_unlock_args(lkb, &args);
4906        if (error)
4907                goto out_r;
4908        lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4909
4910        error = _cancel_lock(r, lkb);
4911 out_r:
4912        unlock_rsb(r);
4913        put_rsb(r);
4914
4915        if (error == -DLM_ECANCEL)
4916                error = 0;
4917        /* from validate_unlock_args() */
4918        if (error == -EBUSY)
4919                error = 0;
4920 out_put:
4921        dlm_put_lkb(lkb);
4922 out:
4923        dlm_unlock_recovery(ls);
4924        return error;
4925}
4926
4927/* lkb's that are removed from the waiters list by revert are just left on the
4928   orphans list with the granted orphan locks, to be freed by purge */
4929
4930static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4931{
4932        struct dlm_args args;
4933        int error;
4934
4935        hold_lkb(lkb);
4936        mutex_lock(&ls->ls_orphans_mutex);
4937        list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4938        mutex_unlock(&ls->ls_orphans_mutex);
4939
4940        set_unlock_args(0, lkb->lkb_ua, &args);
4941
4942        error = cancel_lock(ls, lkb, &args);
4943        if (error == -DLM_ECANCEL)
4944                error = 0;
4945        return error;
4946}
4947
4948/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4949   Regardless of what rsb queue the lock is on, it's removed and freed. */
4950
4951static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4952{
4953        struct dlm_args args;
4954        int error;
4955
4956        set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4957
4958        error = unlock_lock(ls, lkb, &args);
4959        if (error == -DLM_EUNLOCK)
4960                error = 0;
4961        return error;
4962}
4963
4964/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4965   (which does lock_rsb) due to deadlock with receiving a message that does
4966   lock_rsb followed by dlm_user_add_cb() */
4967
4968static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4969                                     struct dlm_user_proc *proc)
4970{
4971        struct dlm_lkb *lkb = NULL;
4972
4973        mutex_lock(&ls->ls_clear_proc_locks);
4974        if (list_empty(&proc->locks))
4975                goto out;
4976
4977        lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4978        list_del_init(&lkb->lkb_ownqueue);
4979
4980        if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4981                lkb->lkb_flags |= DLM_IFL_ORPHAN;
4982        else
4983                lkb->lkb_flags |= DLM_IFL_DEAD;
4984 out:
4985        mutex_unlock(&ls->ls_clear_proc_locks);
4986        return lkb;
4987}
4988
4989/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
4990   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4991   which we clear here. */
4992
4993/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4994   list, and no more device_writes should add lkb's to proc->locks list; so we
4995   shouldn't need to take asts_spin or locks_spin here.  this assumes that
4996   device reads/writes/closes are serialized -- FIXME: we may need to serialize
4997   them ourself. */
4998
4999void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5000{
5001        struct dlm_lkb *lkb, *safe;
5002
5003        dlm_lock_recovery(ls);
5004
5005        while (1) {
5006                lkb = del_proc_lock(ls, proc);
5007                if (!lkb)
5008                        break;
5009                del_timeout(lkb);
5010                if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5011                        orphan_proc_lock(ls, lkb);
5012                else
5013                        unlock_proc_lock(ls, lkb);
5014
5015                /* this removes the reference for the proc->locks list
5016                   added by dlm_user_request, it may result in the lkb
5017                   being freed */
5018
5019                dlm_put_lkb(lkb);
5020        }
5021
5022        mutex_lock(&ls->ls_clear_proc_locks);
5023
5024        /* in-progress unlocks */
5025        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5026                list_del_init(&lkb->lkb_ownqueue);
5027                lkb->lkb_flags |= DLM_IFL_DEAD;
5028                dlm_put_lkb(lkb);
5029        }
5030
5031        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5032                memset(&lkb->lkb_callbacks, 0,
5033                       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5034                list_del_init(&lkb->lkb_cb_list);
5035                dlm_put_lkb(lkb);
5036        }
5037
5038        mutex_unlock(&ls->ls_clear_proc_locks);
5039        dlm_unlock_recovery(ls);
5040}
5041
5042static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5043{
5044        struct dlm_lkb *lkb, *safe;
5045
5046        while (1) {
5047                lkb = NULL;
5048                spin_lock(&proc->locks_spin);
5049                if (!list_empty(&proc->locks)) {
5050                        lkb = list_entry(proc->locks.next, struct dlm_lkb,
5051                                         lkb_ownqueue);
5052                        list_del_init(&lkb->lkb_ownqueue);
5053                }
5054                spin_unlock(&proc->locks_spin);
5055
5056                if (!lkb)
5057                        break;
5058
5059                lkb->lkb_flags |= DLM_IFL_DEAD;
5060                unlock_proc_lock(ls, lkb);
5061                dlm_put_lkb(lkb); /* ref from proc->locks list */
5062        }
5063
5064        spin_lock(&proc->locks_spin);
5065        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5066                list_del_init(&lkb->lkb_ownqueue);
5067                lkb->lkb_flags |= DLM_IFL_DEAD;
5068                dlm_put_lkb(lkb);
5069        }
5070        spin_unlock(&proc->locks_spin);
5071
5072        spin_lock(&proc->asts_spin);
5073        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5074                memset(&lkb->lkb_callbacks, 0,
5075                       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5076                list_del_init(&lkb->lkb_cb_list);
5077                dlm_put_lkb(lkb);
5078        }
5079        spin_unlock(&proc->asts_spin);
5080}
5081
5082/* pid of 0 means purge all orphans */
5083
5084static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5085{
5086        struct dlm_lkb *lkb, *safe;
5087
5088        mutex_lock(&ls->ls_orphans_mutex);
5089        list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5090                if (pid && lkb->lkb_ownpid != pid)
5091                        continue;
5092                unlock_proc_lock(ls, lkb);
5093                list_del_init(&lkb->lkb_ownqueue);
5094                dlm_put_lkb(lkb);
5095        }
5096        mutex_unlock(&ls->ls_orphans_mutex);
5097}
5098
5099static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5100{
5101        struct dlm_message *ms;
5102        struct dlm_mhandle *mh;
5103        int error;
5104
5105        error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5106                                DLM_MSG_PURGE, &ms, &mh);
5107        if (error)
5108                return error;
5109        ms->m_nodeid = nodeid;
5110        ms->m_pid = pid;
5111
5112        return send_message(mh, ms);
5113}
5114
5115int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5116                   int nodeid, int pid)
5117{
5118        int error = 0;
5119
5120        if (nodeid != dlm_our_nodeid()) {
5121                error = send_purge(ls, nodeid, pid);
5122        } else {
5123                dlm_lock_recovery(ls);
5124                if (pid == current->pid)
5125                        purge_proc_locks(ls, proc);
5126                else
5127                        do_purge(ls, nodeid, pid);
5128                dlm_unlock_recovery(ls);
5129        }
5130        return error;
5131}
5132
5133
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.