linux/fs/dlm/lock.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13/* Central locking logic has four stages:
  14
  15   dlm_lock()
  16   dlm_unlock()
  17
  18   request_lock(ls, lkb)
  19   convert_lock(ls, lkb)
  20   unlock_lock(ls, lkb)
  21   cancel_lock(ls, lkb)
  22
  23   _request_lock(r, lkb)
  24   _convert_lock(r, lkb)
  25   _unlock_lock(r, lkb)
  26   _cancel_lock(r, lkb)
  27
  28   do_request(r, lkb)
  29   do_convert(r, lkb)
  30   do_unlock(r, lkb)
  31   do_cancel(r, lkb)
  32
  33   Stage 1 (lock, unlock) is mainly about checking input args and
  34   splitting into one of the four main operations:
  35
  36       dlm_lock          = request_lock
  37       dlm_lock+CONVERT  = convert_lock
  38       dlm_unlock        = unlock_lock
  39       dlm_unlock+CANCEL = cancel_lock
  40
  41   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
  42   provided to the next stage.
  43
  44   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
  45   When remote, it calls send_xxxx(), when local it calls do_xxxx().
  46
  47   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
  48   given rsb and lkb and queues callbacks.
  49
  50   For remote operations, send_xxxx() results in the corresponding do_xxxx()
  51   function being executed on the remote node.  The connecting send/receive
  52   calls on local (L) and remote (R) nodes:
  53
  54   L: send_xxxx()              ->  R: receive_xxxx()
  55                                   R: do_xxxx()
  56   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
  57*/
  58#include <linux/types.h>
  59#include <linux/slab.h>
  60#include "dlm_internal.h"
  61#include <linux/dlm_device.h>
  62#include "memory.h"
  63#include "lowcomms.h"
  64#include "requestqueue.h"
  65#include "util.h"
  66#include "dir.h"
  67#include "member.h"
  68#include "lockspace.h"
  69#include "ast.h"
  70#include "lock.h"
  71#include "rcom.h"
  72#include "recover.h"
  73#include "lvb_table.h"
  74#include "user.h"
  75#include "config.h"
  76
  77static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
  78static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
  79static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  80static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
  81static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
  82static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
  83static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
  84static int send_remove(struct dlm_rsb *r);
  85static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  86static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  87static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
  88                                    struct dlm_message *ms);
  89static int receive_extralen(struct dlm_message *ms);
  90static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
  91static void del_timeout(struct dlm_lkb *lkb);
  92
  93/*
  94 * Lock compatibilty matrix - thanks Steve
  95 * UN = Unlocked state. Not really a state, used as a flag
  96 * PD = Padding. Used to make the matrix a nice power of two in size
  97 * Other states are the same as the VMS DLM.
  98 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
  99 */
 100
 101static const int __dlm_compat_matrix[8][8] = {
 102      /* UN NL CR CW PR PW EX PD */
 103        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
 104        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
 105        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
 106        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
 107        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
 108        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
 109        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
 110        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 111};
 112
 113/*
 114 * This defines the direction of transfer of LVB data.
 115 * Granted mode is the row; requested mode is the column.
 116 * Usage: matrix[grmode+1][rqmode+1]
 117 * 1 = LVB is returned to the caller
 118 * 0 = LVB is written to the resource
 119 * -1 = nothing happens to the LVB
 120 */
 121
 122const int dlm_lvb_operations[8][8] = {
 123        /* UN   NL  CR  CW  PR  PW  EX  PD*/
 124        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
 125        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
 126        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
 127        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
 128        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
 129        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
 130        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
 131        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
 132};
 133
 134#define modes_compat(gr, rq) \
 135        __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
 136
 137int dlm_modes_compat(int mode1, int mode2)
 138{
 139        return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
 140}
 141
 142/*
 143 * Compatibility matrix for conversions with QUECVT set.
 144 * Granted mode is the row; requested mode is the column.
 145 * Usage: matrix[grmode+1][rqmode+1]
 146 */
 147
 148static const int __quecvt_compat_matrix[8][8] = {
 149      /* UN NL CR CW PR PW EX PD */
 150        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
 151        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
 152        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
 153        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
 154        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
 155        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
 156        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
 157        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 158};
 159
 160void dlm_print_lkb(struct dlm_lkb *lkb)
 161{
 162        printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
 163               "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
 164               lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
 165               lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
 166               lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
 167}
 168
 169static void dlm_print_rsb(struct dlm_rsb *r)
 170{
 171        printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
 172               r->res_nodeid, r->res_flags, r->res_first_lkid,
 173               r->res_recover_locks_count, r->res_name);
 174}
 175
 176void dlm_dump_rsb(struct dlm_rsb *r)
 177{
 178        struct dlm_lkb *lkb;
 179
 180        dlm_print_rsb(r);
 181
 182        printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
 183               list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
 184        printk(KERN_ERR "rsb lookup list\n");
 185        list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
 186                dlm_print_lkb(lkb);
 187        printk(KERN_ERR "rsb grant queue:\n");
 188        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
 189                dlm_print_lkb(lkb);
 190        printk(KERN_ERR "rsb convert queue:\n");
 191        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
 192                dlm_print_lkb(lkb);
 193        printk(KERN_ERR "rsb wait queue:\n");
 194        list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
 195                dlm_print_lkb(lkb);
 196}
 197
 198/* Threads cannot use the lockspace while it's being recovered */
 199
 200static inline void dlm_lock_recovery(struct dlm_ls *ls)
 201{
 202        down_read(&ls->ls_in_recovery);
 203}
 204
 205void dlm_unlock_recovery(struct dlm_ls *ls)
 206{
 207        up_read(&ls->ls_in_recovery);
 208}
 209
 210int dlm_lock_recovery_try(struct dlm_ls *ls)
 211{
 212        return down_read_trylock(&ls->ls_in_recovery);
 213}
 214
 215static inline int can_be_queued(struct dlm_lkb *lkb)
 216{
 217        return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
 218}
 219
 220static inline int force_blocking_asts(struct dlm_lkb *lkb)
 221{
 222        return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
 223}
 224
 225static inline int is_demoted(struct dlm_lkb *lkb)
 226{
 227        return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
 228}
 229
 230static inline int is_altmode(struct dlm_lkb *lkb)
 231{
 232        return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
 233}
 234
 235static inline int is_granted(struct dlm_lkb *lkb)
 236{
 237        return (lkb->lkb_status == DLM_LKSTS_GRANTED);
 238}
 239
 240static inline int is_remote(struct dlm_rsb *r)
 241{
 242        DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
 243        return !!r->res_nodeid;
 244}
 245
 246static inline int is_process_copy(struct dlm_lkb *lkb)
 247{
 248        return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
 249}
 250
 251static inline int is_master_copy(struct dlm_lkb *lkb)
 252{
 253        if (lkb->lkb_flags & DLM_IFL_MSTCPY)
 254                DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
 255        return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 256}
 257
 258static inline int middle_conversion(struct dlm_lkb *lkb)
 259{
 260        if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
 261            (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
 262                return 1;
 263        return 0;
 264}
 265
 266static inline int down_conversion(struct dlm_lkb *lkb)
 267{
 268        return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
 269}
 270
 271static inline int is_overlap_unlock(struct dlm_lkb *lkb)
 272{
 273        return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
 274}
 275
 276static inline int is_overlap_cancel(struct dlm_lkb *lkb)
 277{
 278        return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
 279}
 280
 281static inline int is_overlap(struct dlm_lkb *lkb)
 282{
 283        return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
 284                                  DLM_IFL_OVERLAP_CANCEL));
 285}
 286
 287static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
 288{
 289        if (is_master_copy(lkb))
 290                return;
 291
 292        del_timeout(lkb);
 293
 294        DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
 295
 296        /* if the operation was a cancel, then return -DLM_ECANCEL, if a
 297           timeout caused the cancel then return -ETIMEDOUT */
 298        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
 299                lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
 300                rv = -ETIMEDOUT;
 301        }
 302
 303        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
 304                lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
 305                rv = -EDEADLK;
 306        }
 307
 308        lkb->lkb_lksb->sb_status = rv;
 309        lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
 310
 311        dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
 312}
 313
 314static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
 315{
 316        queue_cast(r, lkb,
 317                   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
 318}
 319
 320static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 321{
 322        lkb->lkb_time_bast = ktime_get();
 323
 324        if (is_master_copy(lkb)) {
 325                lkb->lkb_bastmode = rqmode; /* printed by debugfs */
 326                send_bast(r, lkb, rqmode);
 327        } else {
 328                dlm_add_ast(lkb, AST_BAST, rqmode);
 329        }
 330}
 331
 332/*
 333 * Basic operations on rsb's and lkb's
 334 */
 335
 336static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
 337{
 338        struct dlm_rsb *r;
 339
 340        r = dlm_allocate_rsb(ls, len);
 341        if (!r)
 342                return NULL;
 343
 344        r->res_ls = ls;
 345        r->res_length = len;
 346        memcpy(r->res_name, name, len);
 347        mutex_init(&r->res_mutex);
 348
 349        INIT_LIST_HEAD(&r->res_lookup);
 350        INIT_LIST_HEAD(&r->res_grantqueue);
 351        INIT_LIST_HEAD(&r->res_convertqueue);
 352        INIT_LIST_HEAD(&r->res_waitqueue);
 353        INIT_LIST_HEAD(&r->res_root_list);
 354        INIT_LIST_HEAD(&r->res_recover_list);
 355
 356        return r;
 357}
 358
 359static int search_rsb_list(struct list_head *head, char *name, int len,
 360                           unsigned int flags, struct dlm_rsb **r_ret)
 361{
 362        struct dlm_rsb *r;
 363        int error = 0;
 364
 365        list_for_each_entry(r, head, res_hashchain) {
 366                if (len == r->res_length && !memcmp(name, r->res_name, len))
 367                        goto found;
 368        }
 369        *r_ret = NULL;
 370        return -EBADR;
 371
 372 found:
 373        if (r->res_nodeid && (flags & R_MASTER))
 374                error = -ENOTBLK;
 375        *r_ret = r;
 376        return error;
 377}
 378
 379static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 380                       unsigned int flags, struct dlm_rsb **r_ret)
 381{
 382        struct dlm_rsb *r;
 383        int error;
 384
 385        error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
 386        if (!error) {
 387                kref_get(&r->res_ref);
 388                goto out;
 389        }
 390        error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 391        if (error)
 392                goto out;
 393
 394        list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
 395
 396        if (dlm_no_directory(ls))
 397                goto out;
 398
 399        if (r->res_nodeid == -1) {
 400                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 401                r->res_first_lkid = 0;
 402        } else if (r->res_nodeid > 0) {
 403                rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
 404                r->res_first_lkid = 0;
 405        } else {
 406                DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
 407                DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
 408        }
 409 out:
 410        *r_ret = r;
 411        return error;
 412}
 413
 414static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 415                      unsigned int flags, struct dlm_rsb **r_ret)
 416{
 417        int error;
 418        spin_lock(&ls->ls_rsbtbl[b].lock);
 419        error = _search_rsb(ls, name, len, b, flags, r_ret);
 420        spin_unlock(&ls->ls_rsbtbl[b].lock);
 421        return error;
 422}
 423
 424/*
 425 * Find rsb in rsbtbl and potentially create/add one
 426 *
 427 * Delaying the release of rsb's has a similar benefit to applications keeping
 428 * NL locks on an rsb, but without the guarantee that the cached master value
 429 * will still be valid when the rsb is reused.  Apps aren't always smart enough
 430 * to keep NL locks on an rsb that they may lock again shortly; this can lead
 431 * to excessive master lookups and removals if we don't delay the release.
 432 *
 433 * Searching for an rsb means looking through both the normal list and toss
 434 * list.  When found on the toss list the rsb is moved to the normal list with
 435 * ref count of 1; when found on normal list the ref count is incremented.
 436 */
 437
 438static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 439                    unsigned int flags, struct dlm_rsb **r_ret)
 440{
 441        struct dlm_rsb *r = NULL, *tmp;
 442        uint32_t hash, bucket;
 443        int error = -EINVAL;
 444
 445        if (namelen > DLM_RESNAME_MAXLEN)
 446                goto out;
 447
 448        if (dlm_no_directory(ls))
 449                flags |= R_CREATE;
 450
 451        error = 0;
 452        hash = jhash(name, namelen, 0);
 453        bucket = hash & (ls->ls_rsbtbl_size - 1);
 454
 455        error = search_rsb(ls, name, namelen, bucket, flags, &r);
 456        if (!error)
 457                goto out;
 458
 459        if (error == -EBADR && !(flags & R_CREATE))
 460                goto out;
 461
 462        /* the rsb was found but wasn't a master copy */
 463        if (error == -ENOTBLK)
 464                goto out;
 465
 466        error = -ENOMEM;
 467        r = create_rsb(ls, name, namelen);
 468        if (!r)
 469                goto out;
 470
 471        r->res_hash = hash;
 472        r->res_bucket = bucket;
 473        r->res_nodeid = -1;
 474        kref_init(&r->res_ref);
 475
 476        /* With no directory, the master can be set immediately */
 477        if (dlm_no_directory(ls)) {
 478                int nodeid = dlm_dir_nodeid(r);
 479                if (nodeid == dlm_our_nodeid())
 480                        nodeid = 0;
 481                r->res_nodeid = nodeid;
 482        }
 483
 484        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 485        error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
 486        if (!error) {
 487                spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 488                dlm_free_rsb(r);
 489                r = tmp;
 490                goto out;
 491        }
 492        list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
 493        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 494        error = 0;
 495 out:
 496        *r_ret = r;
 497        return error;
 498}
 499
 500/* This is only called to add a reference when the code already holds
 501   a valid reference to the rsb, so there's no need for locking. */
 502
 503static inline void hold_rsb(struct dlm_rsb *r)
 504{
 505        kref_get(&r->res_ref);
 506}
 507
 508void dlm_hold_rsb(struct dlm_rsb *r)
 509{
 510        hold_rsb(r);
 511}
 512
 513static void toss_rsb(struct kref *kref)
 514{
 515        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 516        struct dlm_ls *ls = r->res_ls;
 517
 518        DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 519        kref_init(&r->res_ref);
 520        list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
 521        r->res_toss_time = jiffies;
 522        if (r->res_lvbptr) {
 523                dlm_free_lvb(r->res_lvbptr);
 524                r->res_lvbptr = NULL;
 525        }
 526}
 527
 528/* When all references to the rsb are gone it's transfered to
 529   the tossed list for later disposal. */
 530
 531static void put_rsb(struct dlm_rsb *r)
 532{
 533        struct dlm_ls *ls = r->res_ls;
 534        uint32_t bucket = r->res_bucket;
 535
 536        spin_lock(&ls->ls_rsbtbl[bucket].lock);
 537        kref_put(&r->res_ref, toss_rsb);
 538        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
 539}
 540
 541void dlm_put_rsb(struct dlm_rsb *r)
 542{
 543        put_rsb(r);
 544}
 545
 546/* See comment for unhold_lkb */
 547
 548static void unhold_rsb(struct dlm_rsb *r)
 549{
 550        int rv;
 551        rv = kref_put(&r->res_ref, toss_rsb);
 552        DLM_ASSERT(!rv, dlm_dump_rsb(r););
 553}
 554
 555static void kill_rsb(struct kref *kref)
 556{
 557        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 558
 559        /* All work is done after the return from kref_put() so we
 560           can release the write_lock before the remove and free. */
 561
 562        DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
 563        DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
 564        DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
 565        DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
 566        DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
 567        DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
 568}
 569
 570/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
 571   The rsb must exist as long as any lkb's for it do. */
 572
 573static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 574{
 575        hold_rsb(r);
 576        lkb->lkb_resource = r;
 577}
 578
 579static void detach_lkb(struct dlm_lkb *lkb)
 580{
 581        if (lkb->lkb_resource) {
 582                put_rsb(lkb->lkb_resource);
 583                lkb->lkb_resource = NULL;
 584        }
 585}
 586
 587static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
 588{
 589        struct dlm_lkb *lkb, *tmp;
 590        uint32_t lkid = 0;
 591        uint16_t bucket;
 592
 593        lkb = dlm_allocate_lkb(ls);
 594        if (!lkb)
 595                return -ENOMEM;
 596
 597        lkb->lkb_nodeid = -1;
 598        lkb->lkb_grmode = DLM_LOCK_IV;
 599        kref_init(&lkb->lkb_ref);
 600        INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 601        INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 602        INIT_LIST_HEAD(&lkb->lkb_time_list);
 603
 604        get_random_bytes(&bucket, sizeof(bucket));
 605        bucket &= (ls->ls_lkbtbl_size - 1);
 606
 607        write_lock(&ls->ls_lkbtbl[bucket].lock);
 608
 609        /* counter can roll over so we must verify lkid is not in use */
 610
 611        while (lkid == 0) {
 612                lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
 613
 614                list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
 615                                    lkb_idtbl_list) {
 616                        if (tmp->lkb_id != lkid)
 617                                continue;
 618                        lkid = 0;
 619                        break;
 620                }
 621        }
 622
 623        lkb->lkb_id = lkid;
 624        list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
 625        write_unlock(&ls->ls_lkbtbl[bucket].lock);
 626
 627        *lkb_ret = lkb;
 628        return 0;
 629}
 630
 631static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
 632{
 633        struct dlm_lkb *lkb;
 634        uint16_t bucket = (lkid >> 16);
 635
 636        list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
 637                if (lkb->lkb_id == lkid)
 638                        return lkb;
 639        }
 640        return NULL;
 641}
 642
 643static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 644{
 645        struct dlm_lkb *lkb;
 646        uint16_t bucket = (lkid >> 16);
 647
 648        if (bucket >= ls->ls_lkbtbl_size)
 649                return -EBADSLT;
 650
 651        read_lock(&ls->ls_lkbtbl[bucket].lock);
 652        lkb = __find_lkb(ls, lkid);
 653        if (lkb)
 654                kref_get(&lkb->lkb_ref);
 655        read_unlock(&ls->ls_lkbtbl[bucket].lock);
 656
 657        *lkb_ret = lkb;
 658        return lkb ? 0 : -ENOENT;
 659}
 660
 661static void kill_lkb(struct kref *kref)
 662{
 663        struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
 664
 665        /* All work is done after the return from kref_put() so we
 666           can release the write_lock before the detach_lkb */
 667
 668        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 669}
 670
 671/* __put_lkb() is used when an lkb may not have an rsb attached to
 672   it so we need to provide the lockspace explicitly */
 673
 674static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 675{
 676        uint16_t bucket = (lkb->lkb_id >> 16);
 677
 678        write_lock(&ls->ls_lkbtbl[bucket].lock);
 679        if (kref_put(&lkb->lkb_ref, kill_lkb)) {
 680                list_del(&lkb->lkb_idtbl_list);
 681                write_unlock(&ls->ls_lkbtbl[bucket].lock);
 682
 683                detach_lkb(lkb);
 684
 685                /* for local/process lkbs, lvbptr points to caller's lksb */
 686                if (lkb->lkb_lvbptr && is_master_copy(lkb))
 687                        dlm_free_lvb(lkb->lkb_lvbptr);
 688                dlm_free_lkb(lkb);
 689                return 1;
 690        } else {
 691                write_unlock(&ls->ls_lkbtbl[bucket].lock);
 692                return 0;
 693        }
 694}
 695
 696int dlm_put_lkb(struct dlm_lkb *lkb)
 697{
 698        struct dlm_ls *ls;
 699
 700        DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
 701        DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
 702
 703        ls = lkb->lkb_resource->res_ls;
 704        return __put_lkb(ls, lkb);
 705}
 706
 707/* This is only called to add a reference when the code already holds
 708   a valid reference to the lkb, so there's no need for locking. */
 709
 710static inline void hold_lkb(struct dlm_lkb *lkb)
 711{
 712        kref_get(&lkb->lkb_ref);
 713}
 714
 715/* This is called when we need to remove a reference and are certain
 716   it's not the last ref.  e.g. del_lkb is always called between a
 717   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
 718   put_lkb would work fine, but would involve unnecessary locking */
 719
 720static inline void unhold_lkb(struct dlm_lkb *lkb)
 721{
 722        int rv;
 723        rv = kref_put(&lkb->lkb_ref, kill_lkb);
 724        DLM_ASSERT(!rv, dlm_print_lkb(lkb););
 725}
 726
 727static void lkb_add_ordered(struct list_head *new, struct list_head *head,
 728                            int mode)
 729{
 730        struct dlm_lkb *lkb = NULL;
 731
 732        list_for_each_entry(lkb, head, lkb_statequeue)
 733                if (lkb->lkb_rqmode < mode)
 734                        break;
 735
 736        __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
 737}
 738
 739/* add/remove lkb to rsb's grant/convert/wait queue */
 740
 741static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
 742{
 743        kref_get(&lkb->lkb_ref);
 744
 745        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 746
 747        lkb->lkb_timestamp = ktime_get();
 748
 749        lkb->lkb_status = status;
 750
 751        switch (status) {
 752        case DLM_LKSTS_WAITING:
 753                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 754                        list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
 755                else
 756                        list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
 757                break;
 758        case DLM_LKSTS_GRANTED:
 759                /* convention says granted locks kept in order of grmode */
 760                lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
 761                                lkb->lkb_grmode);
 762                break;
 763        case DLM_LKSTS_CONVERT:
 764                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 765                        list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
 766                else
 767                        list_add_tail(&lkb->lkb_statequeue,
 768                                      &r->res_convertqueue);
 769                break;
 770        default:
 771                DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
 772        }
 773}
 774
 775static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 776{
 777        lkb->lkb_status = 0;
 778        list_del(&lkb->lkb_statequeue);
 779        unhold_lkb(lkb);
 780}
 781
 782static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
 783{
 784        hold_lkb(lkb);
 785        del_lkb(r, lkb);
 786        add_lkb(r, lkb, sts);
 787        unhold_lkb(lkb);
 788}
 789
 790static int msg_reply_type(int mstype)
 791{
 792        switch (mstype) {
 793        case DLM_MSG_REQUEST:
 794                return DLM_MSG_REQUEST_REPLY;
 795        case DLM_MSG_CONVERT:
 796                return DLM_MSG_CONVERT_REPLY;
 797        case DLM_MSG_UNLOCK:
 798                return DLM_MSG_UNLOCK_REPLY;
 799        case DLM_MSG_CANCEL:
 800                return DLM_MSG_CANCEL_REPLY;
 801        case DLM_MSG_LOOKUP:
 802                return DLM_MSG_LOOKUP_REPLY;
 803        }
 804        return -1;
 805}
 806
 807/* add/remove lkb from global waiters list of lkb's waiting for
 808   a reply from a remote node */
 809
 810static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
 811{
 812        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 813        int error = 0;
 814
 815        mutex_lock(&ls->ls_waiters_mutex);
 816
 817        if (is_overlap_unlock(lkb) ||
 818            (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
 819                error = -EINVAL;
 820                goto out;
 821        }
 822
 823        if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
 824                switch (mstype) {
 825                case DLM_MSG_UNLOCK:
 826                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
 827                        break;
 828                case DLM_MSG_CANCEL:
 829                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
 830                        break;
 831                default:
 832                        error = -EBUSY;
 833                        goto out;
 834                }
 835                lkb->lkb_wait_count++;
 836                hold_lkb(lkb);
 837
 838                log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
 839                          lkb->lkb_id, lkb->lkb_wait_type, mstype,
 840                          lkb->lkb_wait_count, lkb->lkb_flags);
 841                goto out;
 842        }
 843
 844        DLM_ASSERT(!lkb->lkb_wait_count,
 845                   dlm_print_lkb(lkb);
 846                   printk("wait_count %d\n", lkb->lkb_wait_count););
 847
 848        lkb->lkb_wait_count++;
 849        lkb->lkb_wait_type = mstype;
 850        hold_lkb(lkb);
 851        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
 852 out:
 853        if (error)
 854                log_error(ls, "addwait error %x %d flags %x %d %d %s",
 855                          lkb->lkb_id, error, lkb->lkb_flags, mstype,
 856                          lkb->lkb_wait_type, lkb->lkb_resource->res_name);
 857        mutex_unlock(&ls->ls_waiters_mutex);
 858        return error;
 859}
 860
 861/* We clear the RESEND flag because we might be taking an lkb off the waiters
 862   list as part of process_requestqueue (e.g. a lookup that has an optimized
 863   request reply on the requestqueue) between dlm_recover_waiters_pre() which
 864   set RESEND and dlm_recover_waiters_post() */
 865
 866static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
 867                                struct dlm_message *ms)
 868{
 869        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 870        int overlap_done = 0;
 871
 872        if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
 873                log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
 874                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
 875                overlap_done = 1;
 876                goto out_del;
 877        }
 878
 879        if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
 880                log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
 881                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 882                overlap_done = 1;
 883                goto out_del;
 884        }
 885
 886        /* Cancel state was preemptively cleared by a successful convert,
 887           see next comment, nothing to do. */
 888
 889        if ((mstype == DLM_MSG_CANCEL_REPLY) &&
 890            (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
 891                log_debug(ls, "remwait %x cancel_reply wait_type %d",
 892                          lkb->lkb_id, lkb->lkb_wait_type);
 893                return -1;
 894        }
 895
 896        /* Remove for the convert reply, and premptively remove for the
 897           cancel reply.  A convert has been granted while there's still
 898           an outstanding cancel on it (the cancel is moot and the result
 899           in the cancel reply should be 0).  We preempt the cancel reply
 900           because the app gets the convert result and then can follow up
 901           with another op, like convert.  This subsequent op would see the
 902           lingering state of the cancel and fail with -EBUSY. */
 903
 904        if ((mstype == DLM_MSG_CONVERT_REPLY) &&
 905            (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
 906            is_overlap_cancel(lkb) && ms && !ms->m_result) {
 907                log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
 908                          lkb->lkb_id);
 909                lkb->lkb_wait_type = 0;
 910                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 911                lkb->lkb_wait_count--;
 912                goto out_del;
 913        }
 914
 915        /* N.B. type of reply may not always correspond to type of original
 916           msg due to lookup->request optimization, verify others? */
 917
 918        if (lkb->lkb_wait_type) {
 919                lkb->lkb_wait_type = 0;
 920                goto out_del;
 921        }
 922
 923        log_error(ls, "remwait error %x reply %d flags %x no wait_type",
 924                  lkb->lkb_id, mstype, lkb->lkb_flags);
 925        return -1;
 926
 927 out_del:
 928        /* the force-unlock/cancel has completed and we haven't recvd a reply
 929           to the op that was in progress prior to the unlock/cancel; we
 930           give up on any reply to the earlier op.  FIXME: not sure when/how
 931           this would happen */
 932
 933        if (overlap_done && lkb->lkb_wait_type) {
 934                log_error(ls, "remwait error %x reply %d wait_type %d overlap",
 935                          lkb->lkb_id, mstype, lkb->lkb_wait_type);
 936                lkb->lkb_wait_count--;
 937                lkb->lkb_wait_type = 0;
 938        }
 939
 940        DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
 941
 942        lkb->lkb_flags &= ~DLM_IFL_RESEND;
 943        lkb->lkb_wait_count--;
 944        if (!lkb->lkb_wait_count)
 945                list_del_init(&lkb->lkb_wait_reply);
 946        unhold_lkb(lkb);
 947        return 0;
 948}
 949
 950static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 951{
 952        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 953        int error;
 954
 955        mutex_lock(&ls->ls_waiters_mutex);
 956        error = _remove_from_waiters(lkb, mstype, NULL);
 957        mutex_unlock(&ls->ls_waiters_mutex);
 958        return error;
 959}
 960
 961/* Handles situations where we might be processing a "fake" or "stub" reply in
 962   which we can't try to take waiters_mutex again. */
 963
 964static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 965{
 966        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 967        int error;
 968
 969        if (ms != &ls->ls_stub_ms)
 970                mutex_lock(&ls->ls_waiters_mutex);
 971        error = _remove_from_waiters(lkb, ms->m_type, ms);
 972        if (ms != &ls->ls_stub_ms)
 973                mutex_unlock(&ls->ls_waiters_mutex);
 974        return error;
 975}
 976
 977static void dir_remove(struct dlm_rsb *r)
 978{
 979        int to_nodeid;
 980
 981        if (dlm_no_directory(r->res_ls))
 982                return;
 983
 984        to_nodeid = dlm_dir_nodeid(r);
 985        if (to_nodeid != dlm_our_nodeid())
 986                send_remove(r);
 987        else
 988                dlm_dir_remove_entry(r->res_ls, to_nodeid,
 989                                     r->res_name, r->res_length);
 990}
 991
 992/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
 993   found since they are in order of newest to oldest? */
 994
 995static int shrink_bucket(struct dlm_ls *ls, int b)
 996{
 997        struct dlm_rsb *r;
 998        int count = 0, found;
 999
1000        for (;;) {
1001                found = 0;
1002                spin_lock(&ls->ls_rsbtbl[b].lock);
1003                list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004                                            res_hashchain) {
1005                        if (!time_after_eq(jiffies, r->res_toss_time +
1006                                           dlm_config.ci_toss_secs * HZ))
1007                                continue;
1008                        found = 1;
1009                        break;
1010                }
1011
1012                if (!found) {
1013                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1014                        break;
1015                }
1016
1017                if (kref_put(&r->res_ref, kill_rsb)) {
1018                        list_del(&r->res_hashchain);
1019                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1020
1021                        if (is_master(r))
1022                                dir_remove(r);
1023                        dlm_free_rsb(r);
1024                        count++;
1025                } else {
1026                        spin_unlock(&ls->ls_rsbtbl[b].lock);
1027                        log_error(ls, "tossed rsb in use %s", r->res_name);
1028                }
1029        }
1030
1031        return count;
1032}
1033
1034void dlm_scan_rsbs(struct dlm_ls *ls)
1035{
1036        int i;
1037
1038        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039                shrink_bucket(ls, i);
1040                if (dlm_locking_stopped(ls))
1041                        break;
1042                cond_resched();
1043        }
1044}
1045
1046static void add_timeout(struct dlm_lkb *lkb)
1047{
1048        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049
1050        if (is_master_copy(lkb))
1051                return;
1052
1053        if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054            !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055                lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056                goto add_it;
1057        }
1058        if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059                goto add_it;
1060        return;
1061
1062 add_it:
1063        DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064        mutex_lock(&ls->ls_timeout_mutex);
1065        hold_lkb(lkb);
1066        list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067        mutex_unlock(&ls->ls_timeout_mutex);
1068}
1069
1070static void del_timeout(struct dlm_lkb *lkb)
1071{
1072        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073
1074        mutex_lock(&ls->ls_timeout_mutex);
1075        if (!list_empty(&lkb->lkb_time_list)) {
1076                list_del_init(&lkb->lkb_time_list);
1077                unhold_lkb(lkb);
1078        }
1079        mutex_unlock(&ls->ls_timeout_mutex);
1080}
1081
1082/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083   lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084   and then lock rsb because of lock ordering in add_timeout.  We may need
1085   to specify some special timeout-related bits in the lkb that are just to
1086   be accessed under the timeout_mutex. */
1087
1088void dlm_scan_timeout(struct dlm_ls *ls)
1089{
1090        struct dlm_rsb *r;
1091        struct dlm_lkb *lkb;
1092        int do_cancel, do_warn;
1093        s64 wait_us;
1094
1095        for (;;) {
1096                if (dlm_locking_stopped(ls))
1097                        break;
1098
1099                do_cancel = 0;
1100                do_warn = 0;
1101                mutex_lock(&ls->ls_timeout_mutex);
1102                list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103
1104                        wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105                                                        lkb->lkb_timestamp));
1106
1107                        if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108                            wait_us >= (lkb->lkb_timeout_cs * 10000))
1109                                do_cancel = 1;
1110
1111                        if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112                            wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113                                do_warn = 1;
1114
1115                        if (!do_cancel && !do_warn)
1116                                continue;
1117                        hold_lkb(lkb);
1118                        break;
1119                }
1120                mutex_unlock(&ls->ls_timeout_mutex);
1121
1122                if (!do_cancel && !do_warn)
1123                        break;
1124
1125                r = lkb->lkb_resource;
1126                hold_rsb(r);
1127                lock_rsb(r);
1128
1129                if (do_warn) {
1130                        /* clear flag so we only warn once */
1131                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132                        if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133                                del_timeout(lkb);
1134                        dlm_timeout_warn(lkb);
1135                }
1136
1137                if (do_cancel) {
1138                        log_debug(ls, "timeout cancel %x node %d %s",
1139                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141                        lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142                        del_timeout(lkb);
1143                        _cancel_lock(r, lkb);
1144                }
1145
1146                unlock_rsb(r);
1147                unhold_rsb(r);
1148                dlm_put_lkb(lkb);
1149        }
1150}
1151
1152/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153   dlm_recoverd before checking/setting ls_recover_begin. */
1154
1155void dlm_adjust_timeouts(struct dlm_ls *ls)
1156{
1157        struct dlm_lkb *lkb;
1158        u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159
1160        ls->ls_recover_begin = 0;
1161        mutex_lock(&ls->ls_timeout_mutex);
1162        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163                lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164        mutex_unlock(&ls->ls_timeout_mutex);
1165}
1166
1167/* lkb is master or local copy */
1168
1169static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170{
1171        int b, len = r->res_ls->ls_lvblen;
1172
1173        /* b=1 lvb returned to caller
1174           b=0 lvb written to rsb or invalidated
1175           b=-1 do nothing */
1176
1177        b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178
1179        if (b == 1) {
1180                if (!lkb->lkb_lvbptr)
1181                        return;
1182
1183                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184                        return;
1185
1186                if (!r->res_lvbptr)
1187                        return;
1188
1189                memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190                lkb->lkb_lvbseq = r->res_lvbseq;
1191
1192        } else if (b == 0) {
1193                if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                        rsb_set_flag(r, RSB_VALNOTVALID);
1195                        return;
1196                }
1197
1198                if (!lkb->lkb_lvbptr)
1199                        return;
1200
1201                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                        return;
1203
1204                if (!r->res_lvbptr)
1205                        r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206
1207                if (!r->res_lvbptr)
1208                        return;
1209
1210                memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211                r->res_lvbseq++;
1212                lkb->lkb_lvbseq = r->res_lvbseq;
1213                rsb_clear_flag(r, RSB_VALNOTVALID);
1214        }
1215
1216        if (rsb_flag(r, RSB_VALNOTVALID))
1217                lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218}
1219
1220static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221{
1222        if (lkb->lkb_grmode < DLM_LOCK_PW)
1223                return;
1224
1225        if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226                rsb_set_flag(r, RSB_VALNOTVALID);
1227                return;
1228        }
1229
1230        if (!lkb->lkb_lvbptr)
1231                return;
1232
1233        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234                return;
1235
1236        if (!r->res_lvbptr)
1237                r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238
1239        if (!r->res_lvbptr)
1240                return;
1241
1242        memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243        r->res_lvbseq++;
1244        rsb_clear_flag(r, RSB_VALNOTVALID);
1245}
1246
1247/* lkb is process copy (pc) */
1248
1249static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250                            struct dlm_message *ms)
1251{
1252        int b;
1253
1254        if (!lkb->lkb_lvbptr)
1255                return;
1256
1257        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258                return;
1259
1260        b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261        if (b == 1) {
1262                int len = receive_extralen(ms);
1263                if (len > DLM_RESNAME_MAXLEN)
1264                        len = DLM_RESNAME_MAXLEN;
1265                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266                lkb->lkb_lvbseq = ms->m_lvbseq;
1267        }
1268}
1269
1270/* Manipulate lkb's on rsb's convert/granted/waiting queues
1271   remove_lock -- used for unlock, removes lkb from granted
1272   revert_lock -- used for cancel, moves lkb from convert to granted
1273   grant_lock  -- used for request and convert, adds lkb to granted or
1274                  moves lkb from convert or waiting to granted
1275
1276   Each of these is used for master or local copy lkb's.  There is
1277   also a _pc() variation used to make the corresponding change on
1278   a process copy (pc) lkb. */
1279
1280static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281{
1282        del_lkb(r, lkb);
1283        lkb->lkb_grmode = DLM_LOCK_IV;
1284        /* this unhold undoes the original ref from create_lkb()
1285           so this leads to the lkb being freed */
1286        unhold_lkb(lkb);
1287}
1288
1289static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290{
1291        set_lvb_unlock(r, lkb);
1292        _remove_lock(r, lkb);
1293}
1294
1295static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296{
1297        _remove_lock(r, lkb);
1298}
1299
1300/* returns: 0 did nothing
1301            1 moved lock to granted
1302           -1 removed lock */
1303
1304static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305{
1306        int rv = 0;
1307
1308        lkb->lkb_rqmode = DLM_LOCK_IV;
1309
1310        switch (lkb->lkb_status) {
1311        case DLM_LKSTS_GRANTED:
1312                break;
1313        case DLM_LKSTS_CONVERT:
1314                move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315                rv = 1;
1316                break;
1317        case DLM_LKSTS_WAITING:
1318                del_lkb(r, lkb);
1319                lkb->lkb_grmode = DLM_LOCK_IV;
1320                /* this unhold undoes the original ref from create_lkb()
1321                   so this leads to the lkb being freed */
1322                unhold_lkb(lkb);
1323                rv = -1;
1324                break;
1325        default:
1326                log_print("invalid status for revert %d", lkb->lkb_status);
1327        }
1328        return rv;
1329}
1330
1331static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332{
1333        return revert_lock(r, lkb);
1334}
1335
1336static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337{
1338        if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339                lkb->lkb_grmode = lkb->lkb_rqmode;
1340                if (lkb->lkb_status)
1341                        move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342                else
1343                        add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344        }
1345
1346        lkb->lkb_rqmode = DLM_LOCK_IV;
1347}
1348
1349static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350{
1351        set_lvb_lock(r, lkb);
1352        _grant_lock(r, lkb);
1353        lkb->lkb_highbast = 0;
1354}
1355
1356static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357                          struct dlm_message *ms)
1358{
1359        set_lvb_lock_pc(r, lkb, ms);
1360        _grant_lock(r, lkb);
1361}
1362
1363/* called by grant_pending_locks() which means an async grant message must
1364   be sent to the requesting node in addition to granting the lock if the
1365   lkb belongs to a remote node. */
1366
1367static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368{
1369        grant_lock(r, lkb);
1370        if (is_master_copy(lkb))
1371                send_grant(r, lkb);
1372        else
1373                queue_cast(r, lkb, 0);
1374}
1375
1376/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377   change the granted/requested modes.  We're munging things accordingly in
1378   the process copy.
1379   CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380   conversion deadlock
1381   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382   compatible with other granted locks */
1383
1384static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385{
1386        if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387                log_print("munge_demoted %x invalid reply type %d",
1388                          lkb->lkb_id, ms->m_type);
1389                return;
1390        }
1391
1392        if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393                log_print("munge_demoted %x invalid modes gr %d rq %d",
1394                          lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395                return;
1396        }
1397
1398        lkb->lkb_grmode = DLM_LOCK_NL;
1399}
1400
1401static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402{
1403        if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404            ms->m_type != DLM_MSG_GRANT) {
1405                log_print("munge_altmode %x invalid reply type %d",
1406                          lkb->lkb_id, ms->m_type);
1407                return;
1408        }
1409
1410        if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411                lkb->lkb_rqmode = DLM_LOCK_PR;
1412        else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413                lkb->lkb_rqmode = DLM_LOCK_CW;
1414        else {
1415                log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416                dlm_print_lkb(lkb);
1417        }
1418}
1419
1420static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421{
1422        struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423                                           lkb_statequeue);
1424        if (lkb->lkb_id == first->lkb_id)
1425                return 1;
1426
1427        return 0;
1428}
1429
1430/* Check if the given lkb conflicts with another lkb on the queue. */
1431
1432static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433{
1434        struct dlm_lkb *this;
1435
1436        list_for_each_entry(this, head, lkb_statequeue) {
1437                if (this == lkb)
1438                        continue;
1439                if (!modes_compat(this, lkb))
1440                        return 1;
1441        }
1442        return 0;
1443}
1444
1445/*
1446 * "A conversion deadlock arises with a pair of lock requests in the converting
1447 * queue for one resource.  The granted mode of each lock blocks the requested
1448 * mode of the other lock."
1449 *
1450 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451 * convert queue from being granted, then deadlk/demote lkb.
1452 *
1453 * Example:
1454 * Granted Queue: empty
1455 * Convert Queue: NL->EX (first lock)
1456 *                PR->EX (second lock)
1457 *
1458 * The first lock can't be granted because of the granted mode of the second
1459 * lock and the second lock can't be granted because it's not first in the
1460 * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462 * flag set and return DEMOTED in the lksb flags.
1463 *
1464 * Originally, this function detected conv-deadlk in a more limited scope:
1465 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466 * - if lkb1 was the first entry in the queue (not just earlier), and was
1467 *   blocked by the granted mode of lkb2, and there was nothing on the
1468 *   granted queue preventing lkb1 from being granted immediately, i.e.
1469 *   lkb2 was the only thing preventing lkb1 from being granted.
1470 *
1471 * That second condition meant we'd only say there was conv-deadlk if
1472 * resolving it (by demotion) would lead to the first lock on the convert
1473 * queue being granted right away.  It allowed conversion deadlocks to exist
1474 * between locks on the convert queue while they couldn't be granted anyway.
1475 *
1476 * Now, we detect and take action on conversion deadlocks immediately when
1477 * they're created, even if they may not be immediately consequential.  If
1478 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479 * mode that would prevent lkb1's conversion from being granted, we do a
1480 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481 * I think this means that the lkb_is_ahead condition below should always
1482 * be zero, i.e. there will never be conv-deadlk between two locks that are
1483 * both already on the convert queue.
1484 */
1485
1486static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487{
1488        struct dlm_lkb *lkb1;
1489        int lkb_is_ahead = 0;
1490
1491        list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492                if (lkb1 == lkb2) {
1493                        lkb_is_ahead = 1;
1494                        continue;
1495                }
1496
1497                if (!lkb_is_ahead) {
1498                        if (!modes_compat(lkb2, lkb1))
1499                                return 1;
1500                } else {
1501                        if (!modes_compat(lkb2, lkb1) &&
1502                            !modes_compat(lkb1, lkb2))
1503                                return 1;
1504                }
1505        }
1506        return 0;
1507}
1508
1509/*
1510 * Return 1 if the lock can be granted, 0 otherwise.
1511 * Also detect and resolve conversion deadlocks.
1512 *
1513 * lkb is the lock to be granted
1514 *
1515 * now is 1 if the function is being called in the context of the
1516 * immediate request, it is 0 if called later, after the lock has been
1517 * queued.
1518 *
1519 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520 */
1521
1522static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523{
1524        int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525
1526        /*
1527         * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528         * a new request for a NL mode lock being blocked.
1529         *
1530         * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531         * request, then it would be granted.  In essence, the use of this flag
1532         * tells the Lock Manager to expedite theis request by not considering
1533         * what may be in the CONVERTING or WAITING queues...  As of this
1534         * writing, the EXPEDITE flag can be used only with new requests for NL
1535         * mode locks.  This flag is not valid for conversion requests.
1536         *
1537         * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538         * conversion or used with a non-NL requested mode.  We also know an
1539         * EXPEDITE request is always granted immediately, so now must always
1540         * be 1.  The full condition to grant an expedite request: (now &&
1541         * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542         * therefore be shortened to just checking the flag.
1543         */
1544
1545        if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546                return 1;
1547
1548        /*
1549         * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550         * added to the remaining conditions.
1551         */
1552
1553        if (queue_conflict(&r->res_grantqueue, lkb))
1554                goto out;
1555
1556        /*
1557         * 6-3: By default, a conversion request is immediately granted if the
1558         * requested mode is compatible with the modes of all other granted
1559         * locks
1560         */
1561
1562        if (queue_conflict(&r->res_convertqueue, lkb))
1563                goto out;
1564
1565        /*
1566         * 6-5: But the default algorithm for deciding whether to grant or
1567         * queue conversion requests does not by itself guarantee that such
1568         * requests are serviced on a "first come first serve" basis.  This, in
1569         * turn, can lead to a phenomenon known as "indefinate postponement".
1570         *
1571         * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572         * the system service employed to request a lock conversion.  This flag
1573         * forces certain conversion requests to be queued, even if they are
1574         * compatible with the granted modes of other locks on the same
1575         * resource.  Thus, the use of this flag results in conversion requests
1576         * being ordered on a "first come first servce" basis.
1577         *
1578         * DCT: This condition is all about new conversions being able to occur
1579         * "in place" while the lock remains on the granted queue (assuming
1580         * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581         * doesn't _have_ to go onto the convert queue where it's processed in
1582         * order.  The "now" variable is necessary to distinguish converts
1583         * being received and processed for the first time now, because once a
1584         * convert is moved to the conversion queue the condition below applies
1585         * requiring fifo granting.
1586         */
1587
1588        if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589                return 1;
1590
1591        /*
1592         * The NOORDER flag is set to avoid the standard vms rules on grant
1593         * order.
1594         */
1595
1596        if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597                return 1;
1598
1599        /*
1600         * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601         * granted until all other conversion requests ahead of it are granted
1602         * and/or canceled.
1603         */
1604
1605        if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606                return 1;
1607
1608        /*
1609         * 6-4: By default, a new request is immediately granted only if all
1610         * three of the following conditions are satisfied when the request is
1611         * issued:
1612         * - The queue of ungranted conversion requests for the resource is
1613         *   empty.
1614         * - The queue of ungranted new requests for the resource is empty.
1615         * - The mode of the new request is compatible with the most
1616         *   restrictive mode of all granted locks on the resource.
1617         */
1618
1619        if (now && !conv && list_empty(&r->res_convertqueue) &&
1620            list_empty(&r->res_waitqueue))
1621                return 1;
1622
1623        /*
1624         * 6-4: Once a lock request is in the queue of ungranted new requests,
1625         * it cannot be granted until the queue of ungranted conversion
1626         * requests is empty, all ungranted new requests ahead of it are
1627         * granted and/or canceled, and it is compatible with the granted mode
1628         * of the most restrictive lock granted on the resource.
1629         */
1630
1631        if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632            first_in_list(lkb, &r->res_waitqueue))
1633                return 1;
1634 out:
1635        return 0;
1636}
1637
1638static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639                          int *err)
1640{
1641        int rv;
1642        int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643        int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644
1645        if (err)
1646                *err = 0;
1647
1648        rv = _can_be_granted(r, lkb, now);
1649        if (rv)
1650                goto out;
1651
1652        /*
1653         * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654         * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655         * cancels one of the locks.
1656         */
1657
1658        if (is_convert && can_be_queued(lkb) &&
1659            conversion_deadlock_detect(r, lkb)) {
1660                if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661                        lkb->lkb_grmode = DLM_LOCK_NL;
1662                        lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663                } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664                        if (err)
1665                                *err = -EDEADLK;
1666                        else {
1667                                log_print("can_be_granted deadlock %x now %d",
1668                                          lkb->lkb_id, now);
1669                                dlm_dump_rsb(r);
1670                        }
1671                }
1672                goto out;
1673        }
1674
1675        /*
1676         * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677         * to grant a request in a mode other than the normal rqmode.  It's a
1678         * simple way to provide a big optimization to applications that can
1679         * use them.
1680         */
1681
1682        if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683                alt = DLM_LOCK_PR;
1684        else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685                alt = DLM_LOCK_CW;
1686
1687        if (alt) {
1688                lkb->lkb_rqmode = alt;
1689                rv = _can_be_granted(r, lkb, now);
1690                if (rv)
1691                        lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692                else
1693                        lkb->lkb_rqmode = rqmode;
1694        }
1695 out:
1696        return rv;
1697}
1698
1699/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700   for locks pending on the convert list.  Once verified (watch for these
1701   log_prints), we should be able to just call _can_be_granted() and not
1702   bother with the demote/deadlk cases here (and there's no easy way to deal
1703   with a deadlk here, we'd have to generate something like grant_lock with
1704   the deadlk error.) */
1705
1706/* Returns the highest requested mode of all blocked conversions; sets
1707   cw if there's a blocked conversion to DLM_LOCK_CW. */
1708
1709static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710{
1711        struct dlm_lkb *lkb, *s;
1712        int hi, demoted, quit, grant_restart, demote_restart;
1713        int deadlk;
1714
1715        quit = 0;
1716 restart:
1717        grant_restart = 0;
1718        demote_restart = 0;
1719        hi = DLM_LOCK_IV;
1720
1721        list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722                demoted = is_demoted(lkb);
1723                deadlk = 0;
1724
1725                if (can_be_granted(r, lkb, 0, &deadlk)) {
1726                        grant_lock_pending(r, lkb);
1727                        grant_restart = 1;
1728                        continue;
1729                }
1730
1731                if (!demoted && is_demoted(lkb)) {
1732                        log_print("WARN: pending demoted %x node %d %s",
1733                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734                        demote_restart = 1;
1735                        continue;
1736                }
1737
1738                if (deadlk) {
1739                        log_print("WARN: pending deadlock %x node %d %s",
1740                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741                        dlm_dump_rsb(r);
1742                        continue;
1743                }
1744
1745                hi = max_t(int, lkb->lkb_rqmode, hi);
1746
1747                if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748                        *cw = 1;
1749        }
1750
1751        if (grant_restart)
1752                goto restart;
1753        if (demote_restart && !quit) {
1754                quit = 1;
1755                goto restart;
1756        }
1757
1758        return max_t(int, high, hi);
1759}
1760
1761static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762{
1763        struct dlm_lkb *lkb, *s;
1764
1765        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766                if (can_be_granted(r, lkb, 0, NULL))
1767                        grant_lock_pending(r, lkb);
1768                else {
1769                        high = max_t(int, lkb->lkb_rqmode, high);
1770                        if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771                                *cw = 1;
1772                }
1773        }
1774
1775        return high;
1776}
1777
1778/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779   on either the convert or waiting queue.
1780   high is the largest rqmode of all locks blocked on the convert or
1781   waiting queue. */
1782
1783static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784{
1785        if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786                if (gr->lkb_highbast < DLM_LOCK_EX)
1787                        return 1;
1788                return 0;
1789        }
1790
1791        if (gr->lkb_highbast < high &&
1792            !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793                return 1;
1794        return 0;
1795}
1796
1797static void grant_pending_locks(struct dlm_rsb *r)
1798{
1799        struct dlm_lkb *lkb, *s;
1800        int high = DLM_LOCK_IV;
1801        int cw = 0;
1802
1803        DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804
1805        high = grant_pending_convert(r, high, &cw);
1806        high = grant_pending_wait(r, high, &cw);
1807
1808        if (high == DLM_LOCK_IV)
1809                return;
1810
1811        /*
1812         * If there are locks left on the wait/convert queue then send blocking
1813         * ASTs to granted locks based on the largest requested mode (high)
1814         * found above.
1815         */
1816
1817        list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818                if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819                        if (cw && high == DLM_LOCK_PR &&
1820                            lkb->lkb_grmode == DLM_LOCK_PR)
1821                                queue_bast(r, lkb, DLM_LOCK_CW);
1822                        else
1823                                queue_bast(r, lkb, high);
1824                        lkb->lkb_highbast = high;
1825                }
1826        }
1827}
1828
1829static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830{
1831        if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832            (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833                if (gr->lkb_highbast < DLM_LOCK_EX)
1834                        return 1;
1835                return 0;
1836        }
1837
1838        if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839                return 1;
1840        return 0;
1841}
1842
1843static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844                            struct dlm_lkb *lkb)
1845{
1846        struct dlm_lkb *gr;
1847
1848        list_for_each_entry(gr, head, lkb_statequeue) {
1849                if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1850                        queue_bast(r, gr, lkb->lkb_rqmode);
1851                        gr->lkb_highbast = lkb->lkb_rqmode;
1852                }
1853        }
1854}
1855
1856static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1857{
1858        send_bast_queue(r, &r->res_grantqueue, lkb);
1859}
1860
1861static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1862{
1863        send_bast_queue(r, &r->res_grantqueue, lkb);
1864        send_bast_queue(r, &r->res_convertqueue, lkb);
1865}
1866
1867/* set_master(r, lkb) -- set the master nodeid of a resource
1868
1869   The purpose of this function is to set the nodeid field in the given
1870   lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1871   known, it can just be copied to the lkb and the function will return
1872   0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1873   before it can be copied to the lkb.
1874
1875   When the rsb nodeid is being looked up remotely, the initial lkb
1876   causing the lookup is kept on the ls_waiters list waiting for the
1877   lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1878   on the rsb's res_lookup list until the master is verified.
1879
1880   Return values:
1881   0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1882   1: the rsb master is not available and the lkb has been placed on
1883      a wait queue
1884*/
1885
1886static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1887{
1888        struct dlm_ls *ls = r->res_ls;
1889        int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1890
1891        if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1892                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1893                r->res_first_lkid = lkb->lkb_id;
1894                lkb->lkb_nodeid = r->res_nodeid;
1895                return 0;
1896        }
1897
1898        if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1899                list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1900                return 1;
1901        }
1902
1903        if (r->res_nodeid == 0) {
1904                lkb->lkb_nodeid = 0;
1905                return 0;
1906        }
1907
1908        if (r->res_nodeid > 0) {
1909                lkb->lkb_nodeid = r->res_nodeid;
1910                return 0;
1911        }
1912
1913        DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1914
1915        dir_nodeid = dlm_dir_nodeid(r);
1916
1917        if (dir_nodeid != our_nodeid) {
1918                r->res_first_lkid = lkb->lkb_id;
1919                send_lookup(r, lkb);
1920                return 1;
1921        }
1922
1923        for (i = 0; i < 2; i++) {
1924                /* It's possible for dlm_scand to remove an old rsb for
1925                   this same resource from the toss list, us to create
1926                   a new one, look up the master locally, and find it
1927                   already exists just before dlm_scand does the
1928                   dir_remove() on the previous rsb. */
1929
1930                error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1931                                       r->res_length, &ret_nodeid);
1932                if (!error)
1933                        break;
1934                log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1935                schedule();
1936        }
1937        if (error && error != -EEXIST)
1938                return error;
1939
1940        if (ret_nodeid == our_nodeid) {
1941                r->res_first_lkid = 0;
1942                r->res_nodeid = 0;
1943                lkb->lkb_nodeid = 0;
1944        } else {
1945                r->res_first_lkid = lkb->lkb_id;
1946                r->res_nodeid = ret_nodeid;
1947                lkb->lkb_nodeid = ret_nodeid;
1948        }
1949        return 0;
1950}
1951
1952static void process_lookup_list(struct dlm_rsb *r)
1953{
1954        struct dlm_lkb *lkb, *safe;
1955
1956        list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1957                list_del_init(&lkb->lkb_rsb_lookup);
1958                _request_lock(r, lkb);
1959                schedule();
1960        }
1961}
1962
1963/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1964
1965static void confirm_master(struct dlm_rsb *r, int error)
1966{
1967        struct dlm_lkb *lkb;
1968
1969        if (!r->res_first_lkid)
1970                return;
1971
1972        switch (error) {
1973        case 0:
1974        case -EINPROGRESS:
1975                r->res_first_lkid = 0;
1976                process_lookup_list(r);
1977                break;
1978
1979        case -EAGAIN:
1980        case -EBADR:
1981        case -ENOTBLK:
1982                /* the remote request failed and won't be retried (it was
1983                   a NOQUEUE, or has been canceled/unlocked); make a waiting
1984                   lkb the first_lkid */
1985
1986                r->res_first_lkid = 0;
1987
1988                if (!list_empty(&r->res_lookup)) {
1989                        lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1990                                         lkb_rsb_lookup);
1991                        list_del_init(&lkb->lkb_rsb_lookup);
1992                        r->res_first_lkid = lkb->lkb_id;
1993                        _request_lock(r, lkb);
1994                }
1995                break;
1996
1997        default:
1998                log_error(r->res_ls, "confirm_master unknown error %d", error);
1999        }
2000}
2001
2002static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2003                         int namelen, unsigned long timeout_cs,
2004                         void (*ast) (void *astparam),
2005                         void *astparam,
2006                         void (*bast) (void *astparam, int mode),
2007                         struct dlm_args *args)
2008{
2009        int rv = -EINVAL;
2010
2011        /* check for invalid arg usage */
2012
2013        if (mode < 0 || mode > DLM_LOCK_EX)
2014                goto out;
2015
2016        if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2017                goto out;
2018
2019        if (flags & DLM_LKF_CANCEL)
2020                goto out;
2021
2022        if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2023                goto out;
2024
2025        if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2026                goto out;
2027
2028        if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2029                goto out;
2030
2031        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2032                goto out;
2033
2034        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2035                goto out;
2036
2037        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2038                goto out;
2039
2040        if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2041                goto out;
2042
2043        if (!ast || !lksb)
2044                goto out;
2045
2046        if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2047                goto out;
2048
2049        if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2050                goto out;
2051
2052        /* these args will be copied to the lkb in validate_lock_args,
2053           it cannot be done now because when converting locks, fields in
2054           an active lkb cannot be modified before locking the rsb */
2055
2056        args->flags = flags;
2057        args->astfn = ast;
2058        args->astparam = astparam;
2059        args->bastfn = bast;
2060        args->timeout = timeout_cs;
2061        args->mode = mode;
2062        args->lksb = lksb;
2063        rv = 0;
2064 out:
2065        return rv;
2066}
2067
2068static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2069{
2070        if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2071                      DLM_LKF_FORCEUNLOCK))
2072                return -EINVAL;
2073
2074        if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2075                return -EINVAL;
2076
2077        args->flags = flags;
2078        args->astparam = astarg;
2079        return 0;
2080}
2081
2082static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2083                              struct dlm_args *args)
2084{
2085        int rv = -EINVAL;
2086
2087        if (args->flags & DLM_LKF_CONVERT) {
2088                if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2089                        goto out;
2090
2091                if (args->flags & DLM_LKF_QUECVT &&
2092                    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2093                        goto out;
2094
2095                rv = -EBUSY;
2096                if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2097                        goto out;
2098
2099                if (lkb->lkb_wait_type)
2100                        goto out;
2101
2102                if (is_overlap(lkb))
2103                        goto out;
2104        }
2105
2106        lkb->lkb_exflags = args->flags;
2107        lkb->lkb_sbflags = 0;
2108        lkb->lkb_astfn = args->astfn;
2109        lkb->lkb_astparam = args->astparam;
2110        lkb->lkb_bastfn = args->bastfn;
2111        lkb->lkb_rqmode = args->mode;
2112        lkb->lkb_lksb = args->lksb;
2113        lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2114        lkb->lkb_ownpid = (int) current->pid;
2115        lkb->lkb_timeout_cs = args->timeout;
2116        rv = 0;
2117 out:
2118        if (rv)
2119                log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2120                          rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2121                          lkb->lkb_status, lkb->lkb_wait_type,
2122                          lkb->lkb_resource->res_name);
2123        return rv;
2124}
2125
2126/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2127   for success */
2128
2129/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2130   because there may be a lookup in progress and it's valid to do
2131   cancel/unlockf on it */
2132
2133static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2134{
2135        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2136        int rv = -EINVAL;
2137
2138        if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2139                log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2140                dlm_print_lkb(lkb);
2141                goto out;
2142        }
2143
2144        /* an lkb may still exist even though the lock is EOL'ed due to a
2145           cancel, unlock or failed noqueue request; an app can't use these
2146           locks; return same error as if the lkid had not been found at all */
2147
2148        if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2149                log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2150                rv = -ENOENT;
2151                goto out;
2152        }
2153
2154        /* an lkb may be waiting for an rsb lookup to complete where the
2155           lookup was initiated by another lock */
2156
2157        if (!list_empty(&lkb->lkb_rsb_lookup)) {
2158                if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2159                        log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2160                        list_del_init(&lkb->lkb_rsb_lookup);
2161                        queue_cast(lkb->lkb_resource, lkb,
2162                                   args->flags & DLM_LKF_CANCEL ?
2163                                   -DLM_ECANCEL : -DLM_EUNLOCK);
2164                        unhold_lkb(lkb); /* undoes create_lkb() */
2165                }
2166                /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2167                rv = -EBUSY;
2168                goto out;
2169        }
2170
2171        /* cancel not allowed with another cancel/unlock in progress */
2172
2173        if (args->flags & DLM_LKF_CANCEL) {
2174                if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2175                        goto out;
2176
2177                if (is_overlap(lkb))
2178                        goto out;
2179
2180                /* don't let scand try to do a cancel */
2181                del_timeout(lkb);
2182
2183                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2184                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2185                        rv = -EBUSY;
2186                        goto out;
2187                }
2188
2189                /* there's nothing to cancel */
2190                if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2191                    !lkb->lkb_wait_type) {
2192                        rv = -EBUSY;
2193                        goto out;
2194                }
2195
2196                switch (lkb->lkb_wait_type) {
2197                case DLM_MSG_LOOKUP:
2198                case DLM_MSG_REQUEST:
2199                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2200                        rv = -EBUSY;
2201                        goto out;
2202                case DLM_MSG_UNLOCK:
2203                case DLM_MSG_CANCEL:
2204                        goto out;
2205                }
2206                /* add_to_waiters() will set OVERLAP_CANCEL */
2207                goto out_ok;
2208        }
2209
2210        /* do we need to allow a force-unlock if there's a normal unlock
2211           already in progress?  in what conditions could the normal unlock
2212           fail such that we'd want to send a force-unlock to be sure? */
2213
2214        if (args->flags & DLM_LKF_FORCEUNLOCK) {
2215                if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2216                        goto out;
2217
2218                if (is_overlap_unlock(lkb))
2219                        goto out;
2220
2221                /* don't let scand try to do a cancel */
2222                del_timeout(lkb);
2223
2224                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2225                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2226                        rv = -EBUSY;
2227                        goto out;
2228                }
2229
2230                switch (lkb->lkb_wait_type) {
2231                case DLM_MSG_LOOKUP:
2232                case DLM_MSG_REQUEST:
2233                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2234                        rv = -EBUSY;
2235                        goto out;
2236                case DLM_MSG_UNLOCK:
2237                        goto out;
2238                }
2239                /* add_to_waiters() will set OVERLAP_UNLOCK */
2240                goto out_ok;
2241        }
2242
2243        /* normal unlock not allowed if there's any op in progress */
2244        rv = -EBUSY;
2245        if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2246                goto out;
2247
2248 out_ok:
2249        /* an overlapping op shouldn't blow away exflags from other op */
2250        lkb->lkb_exflags |= args->flags;
2251        lkb->lkb_sbflags = 0;
2252        lkb->lkb_astparam = args->astparam;
2253        rv = 0;
2254 out:
2255        if (rv)
2256                log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2257                          lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2258                          args->flags, lkb->lkb_wait_type,
2259                          lkb->lkb_resource->res_name);
2260        return rv;
2261}
2262
2263/*
2264 * Four stage 4 varieties:
2265 * do_request(), do_convert(), do_unlock(), do_cancel()
2266 * These are called on the master node for the given lock and
2267 * from the central locking logic.
2268 */
2269
2270static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2271{
2272        int error = 0;
2273
2274        if (can_be_granted(r, lkb, 1, NULL)) {
2275                grant_lock(r, lkb);
2276                queue_cast(r, lkb, 0);
2277                goto out;
2278        }
2279
2280        if (can_be_queued(lkb)) {
2281                error = -EINPROGRESS;
2282                add_lkb(r, lkb, DLM_LKSTS_WAITING);
2283                add_timeout(lkb);
2284                goto out;
2285        }
2286
2287        error = -EAGAIN;
2288        queue_cast(r, lkb, -EAGAIN);
2289 out:
2290        return error;
2291}
2292
2293static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2294                               int error)
2295{
2296        switch (error) {
2297        case -EAGAIN:
2298                if (force_blocking_asts(lkb))
2299                        send_blocking_asts_all(r, lkb);
2300                break;
2301        case -EINPROGRESS:
2302                send_blocking_asts(r, lkb);
2303                break;
2304        }
2305}
2306
2307static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2308{
2309        int error = 0;
2310        int deadlk = 0;
2311
2312        /* changing an existing lock may allow others to be granted */
2313
2314        if (can_be_granted(r, lkb, 1, &deadlk)) {
2315                grant_lock(r, lkb);
2316                queue_cast(r, lkb, 0);
2317                goto out;
2318        }
2319
2320        /* can_be_granted() detected that this lock would block in a conversion
2321           deadlock, so we leave it on the granted queue and return EDEADLK in
2322           the ast for the convert. */
2323
2324        if (deadlk) {
2325                /* it's left on the granted queue */
2326                log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2327                          lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2328                          lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2329                revert_lock(r, lkb);
2330                queue_cast(r, lkb, -EDEADLK);
2331                error = -EDEADLK;
2332                goto out;
2333        }
2334
2335        /* is_demoted() means the can_be_granted() above set the grmode
2336           to NL, and left us on the granted queue.  This auto-demotion
2337           (due to CONVDEADLK) might mean other locks, and/or this lock, are
2338           now grantable.  We have to try to grant other converting locks
2339           before we try again to grant this one. */
2340
2341        if (is_demoted(lkb)) {
2342                grant_pending_convert(r, DLM_LOCK_IV, NULL);
2343                if (_can_be_granted(r, lkb, 1)) {
2344                        grant_lock(r, lkb);
2345                        queue_cast(r, lkb, 0);
2346                        goto out;
2347                }
2348                /* else fall through and move to convert queue */
2349        }
2350
2351        if (can_be_queued(lkb)) {
2352                error = -EINPROGRESS;
2353                del_lkb(r, lkb);
2354                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2355                add_timeout(lkb);
2356                goto out;
2357        }
2358
2359        error = -EAGAIN;
2360        queue_cast(r, lkb, -EAGAIN);
2361 out:
2362        return error;
2363}
2364
2365static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2366                               int error)
2367{
2368        switch (error) {
2369        case 0:
2370                grant_pending_locks(r);
2371                /* grant_pending_locks also sends basts */
2372                break;
2373        case -EAGAIN:
2374                if (force_blocking_asts(lkb))
2375                        send_blocking_asts_all(r, lkb);
2376                break;
2377        case -EINPROGRESS:
2378                send_blocking_asts(r, lkb);
2379                break;
2380        }
2381}
2382
2383static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2384{
2385        remove_lock(r, lkb);
2386        queue_cast(r, lkb, -DLM_EUNLOCK);
2387        return -DLM_EUNLOCK;
2388}
2389
2390static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2391                              int error)
2392{
2393        grant_pending_locks(r);
2394}
2395
2396/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2397 
2398static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2399{
2400        int error;
2401
2402        error = revert_lock(r, lkb);
2403        if (error) {
2404                queue_cast(r, lkb, -DLM_ECANCEL);
2405                return -DLM_ECANCEL;
2406        }
2407        return 0;
2408}
2409
2410static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2411                              int error)
2412{
2413        if (error)
2414                grant_pending_locks(r);
2415}
2416
2417/*
2418 * Four stage 3 varieties:
2419 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2420 */
2421
2422/* add a new lkb to a possibly new rsb, called by requesting process */
2423
2424static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2425{
2426        int error;
2427
2428        /* set_master: sets lkb nodeid from r */
2429
2430        error = set_master(r, lkb);
2431        if (error < 0)
2432                goto out;
2433        if (error) {
2434                error = 0;
2435                goto out;
2436        }
2437
2438        if (is_remote(r)) {
2439                /* receive_request() calls do_request() on remote node */
2440                error = send_request(r, lkb);
2441        } else {
2442                error = do_request(r, lkb);
2443                /* for remote locks the request_reply is sent
2444                   between do_request and do_request_effects */
2445                do_request_effects(r, lkb, error);
2446        }
2447 out:
2448        return error;
2449}
2450
2451/* change some property of an existing lkb, e.g. mode */
2452
2453static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2454{
2455        int error;
2456
2457        if (is_remote(r)) {
2458                /* receive_convert() calls do_convert() on remote node */
2459                error = send_convert(r, lkb);
2460        } else {
2461                error = do_convert(r, lkb);
2462                /* for remote locks the convert_reply is sent
2463                   between do_convert and do_convert_effects */
2464                do_convert_effects(r, lkb, error);
2465        }
2466
2467        return error;
2468}
2469
2470/* remove an existing lkb from the granted queue */
2471
2472static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2473{
2474        int error;
2475
2476        if (is_remote(r)) {
2477                /* receive_unlock() calls do_unlock() on remote node */
2478                error = send_unlock(r, lkb);
2479        } else {
2480                error = do_unlock(r, lkb);
2481                /* for remote locks the unlock_reply is sent
2482                   between do_unlock and do_unlock_effects */
2483                do_unlock_effects(r, lkb, error);
2484        }
2485
2486        return error;
2487}
2488
2489/* remove an existing lkb from the convert or wait queue */
2490
2491static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2492{
2493        int error;
2494
2495        if (is_remote(r)) {
2496                /* receive_cancel() calls do_cancel() on remote node */
2497                error = send_cancel(r, lkb);
2498        } else {
2499                error = do_cancel(r, lkb);
2500                /* for remote locks the cancel_reply is sent
2501                   between do_cancel and do_cancel_effects */
2502                do_cancel_effects(r, lkb, error);
2503        }
2504
2505        return error;
2506}
2507
2508/*
2509 * Four stage 2 varieties:
2510 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2511 */
2512
2513static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2514                        int len, struct dlm_args *args)
2515{
2516        struct dlm_rsb *r;
2517        int error;
2518
2519        error = validate_lock_args(ls, lkb, args);
2520        if (error)
2521                goto out;
2522
2523        error = find_rsb(ls, name, len, R_CREATE, &r);
2524        if (error)
2525                goto out;
2526
2527        lock_rsb(r);
2528
2529        attach_lkb(r, lkb);
2530        lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2531
2532        error = _request_lock(r, lkb);
2533
2534        unlock_rsb(r);
2535        put_rsb(r);
2536
2537 out:
2538        return error;
2539}
2540
2541static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2542                        struct dlm_args *args)
2543{
2544        struct dlm_rsb *r;
2545        int error;
2546
2547        r = lkb->lkb_resource;
2548
2549        hold_rsb(r);
2550        lock_rsb(r);
2551
2552        error = validate_lock_args(ls, lkb, args);
2553        if (error)
2554                goto out;
2555
2556        error = _convert_lock(r, lkb);
2557 out:
2558        unlock_rsb(r);
2559        put_rsb(r);
2560        return error;
2561}
2562
2563static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2564                       struct dlm_args *args)
2565{
2566        struct dlm_rsb *r;
2567        int error;
2568
2569        r = lkb->lkb_resource;
2570
2571        hold_rsb(r);
2572        lock_rsb(r);
2573
2574        error = validate_unlock_args(lkb, args);
2575        if (error)
2576                goto out;
2577
2578        error = _unlock_lock(r, lkb);
2579 out:
2580        unlock_rsb(r);
2581        put_rsb(r);
2582        return error;
2583}
2584
2585static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2586                       struct dlm_args *args)
2587{
2588        struct dlm_rsb *r;
2589        int error;
2590
2591        r = lkb->lkb_resource;
2592
2593        hold_rsb(r);
2594        lock_rsb(r);
2595
2596        error = validate_unlock_args(lkb, args);
2597        if (error)
2598                goto out;
2599
2600        error = _cancel_lock(r, lkb);
2601 out:
2602        unlock_rsb(r);
2603        put_rsb(r);
2604        return error;
2605}
2606
2607/*
2608 * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2609 */
2610
2611int dlm_lock(dlm_lockspace_t *lockspace,
2612             int mode,
2613             struct dlm_lksb *lksb,
2614             uint32_t flags,
2615             void *name,
2616             unsigned int namelen,
2617             uint32_t parent_lkid,
2618             void (*ast) (void *astarg),
2619             void *astarg,
2620             void (*bast) (void *astarg, int mode))
2621{
2622        struct dlm_ls *ls;
2623        struct dlm_lkb *lkb;
2624        struct dlm_args args;
2625        int error, convert = flags & DLM_LKF_CONVERT;
2626
2627        ls = dlm_find_lockspace_local(lockspace);
2628        if (!ls)
2629                return -EINVAL;
2630
2631        dlm_lock_recovery(ls);
2632
2633        if (convert)
2634                error = find_lkb(ls, lksb->sb_lkid, &lkb);
2635        else
2636                error = create_lkb(ls, &lkb);
2637
2638        if (error)
2639                goto out;
2640
2641        error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2642                              astarg, bast, &args);
2643        if (error)
2644                goto out_put;
2645
2646        if (convert)
2647                error = convert_lock(ls, lkb, &args);
2648        else
2649                error = request_lock(ls, lkb, name, namelen, &args);
2650
2651        if (error == -EINPROGRESS)
2652                error = 0;
2653 out_put:
2654        if (convert || error)
2655                __put_lkb(ls, lkb);
2656        if (error == -EAGAIN || error == -EDEADLK)
2657                error = 0;
2658 out:
2659        dlm_unlock_recovery(ls);
2660        dlm_put_lockspace(ls);
2661        return error;
2662}
2663
2664int dlm_unlock(dlm_lockspace_t *lockspace,
2665               uint32_t lkid,
2666               uint32_t flags,
2667               struct dlm_lksb *lksb,
2668               void *astarg)
2669{
2670        struct dlm_ls *ls;
2671        struct dlm_lkb *lkb;
2672        struct dlm_args args;
2673        int error;
2674
2675        ls = dlm_find_lockspace_local(lockspace);
2676        if (!ls)
2677                return -EINVAL;
2678
2679        dlm_lock_recovery(ls);
2680
2681        error = find_lkb(ls, lkid, &lkb);
2682        if (error)
2683                goto out;
2684
2685        error = set_unlock_args(flags, astarg, &args);
2686        if (error)
2687                goto out_put;
2688
2689        if (flags & DLM_LKF_CANCEL)
2690                error = cancel_lock(ls, lkb, &args);
2691        else
2692                error = unlock_lock(ls, lkb, &args);
2693
2694        if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2695                error = 0;
2696        if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2697                error = 0;
2698 out_put:
2699        dlm_put_lkb(lkb);
2700 out:
2701        dlm_unlock_recovery(ls);
2702        dlm_put_lockspace(ls);
2703        return error;
2704}
2705
2706/*
2707 * send/receive routines for remote operations and replies
2708 *
2709 * send_args
2710 * send_common
2711 * send_request                 receive_request
2712 * send_convert                 receive_convert
2713 * send_unlock                  receive_unlock
2714 * send_cancel                  receive_cancel
2715 * send_grant                   receive_grant
2716 * send_bast                    receive_bast
2717 * send_lookup                  receive_lookup
2718 * send_remove                  receive_remove
2719 *
2720 *                              send_common_reply
2721 * receive_request_reply        send_request_reply
2722 * receive_convert_reply        send_convert_reply
2723 * receive_unlock_reply         send_unlock_reply
2724 * receive_cancel_reply         send_cancel_reply
2725 * receive_lookup_reply         send_lookup_reply
2726 */
2727
2728static int _create_message(struct dlm_ls *ls, int mb_len,
2729                           int to_nodeid, int mstype,
2730                           struct dlm_message **ms_ret,
2731                           struct dlm_mhandle **mh_ret)
2732{
2733        struct dlm_message *ms;
2734        struct dlm_mhandle *mh;
2735        char *mb;
2736
2737        /* get_buffer gives us a message handle (mh) that we need to
2738           pass into lowcomms_commit and a message buffer (mb) that we
2739           write our data into */
2740
2741        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2742        if (!mh)
2743                return -ENOBUFS;
2744
2745        memset(mb, 0, mb_len);
2746
2747        ms = (struct dlm_message *) mb;
2748
2749        ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2750        ms->m_header.h_lockspace = ls->ls_global_id;
2751        ms->m_header.h_nodeid = dlm_our_nodeid();
2752        ms->m_header.h_length = mb_len;
2753        ms->m_header.h_cmd = DLM_MSG;
2754
2755        ms->m_type = mstype;
2756
2757        *mh_ret = mh;
2758        *ms_ret = ms;
2759        return 0;
2760}
2761
2762static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2763                          int to_nodeid, int mstype,
2764                          struct dlm_message **ms_ret,
2765                          struct dlm_mhandle **mh_ret)
2766{
2767        int mb_len = sizeof(struct dlm_message);
2768
2769        switch (mstype) {
2770        case DLM_MSG_REQUEST:
2771        case DLM_MSG_LOOKUP:
2772        case DLM_MSG_REMOVE:
2773                mb_len += r->res_length;
2774                break;
2775        case DLM_MSG_CONVERT:
2776        case DLM_MSG_UNLOCK:
2777        case DLM_MSG_REQUEST_REPLY:
2778        case DLM_MSG_CONVERT_REPLY:
2779        case DLM_MSG_GRANT:
2780                if (lkb && lkb->lkb_lvbptr)
2781                        mb_len += r->res_ls->ls_lvblen;
2782                break;
2783        }
2784
2785        return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2786                               ms_ret, mh_ret);
2787}
2788
2789/* further lowcomms enhancements or alternate implementations may make
2790   the return value from this function useful at some point */
2791
2792static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2793{
2794        dlm_message_out(ms);
2795        dlm_lowcomms_commit_buffer(mh);
2796        return 0;
2797}
2798
2799static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2800                      struct dlm_message *ms)
2801{
2802        ms->m_nodeid   = lkb->lkb_nodeid;
2803        ms->m_pid      = lkb->lkb_ownpid;
2804        ms->m_lkid     = lkb->lkb_id;
2805        ms->m_remid    = lkb->lkb_remid;
2806        ms->m_exflags  = lkb->lkb_exflags;
2807        ms->m_sbflags  = lkb->lkb_sbflags;
2808        ms->m_flags    = lkb->lkb_flags;
2809        ms->m_lvbseq   = lkb->lkb_lvbseq;
2810        ms->m_status   = lkb->lkb_status;
2811        ms->m_grmode   = lkb->lkb_grmode;
2812        ms->m_rqmode   = lkb->lkb_rqmode;
2813        ms->m_hash     = r->res_hash;
2814
2815        /* m_result and m_bastmode are set from function args,
2816           not from lkb fields */
2817
2818        if (lkb->lkb_bastfn)
2819                ms->m_asts |= AST_BAST;
2820        if (lkb->lkb_astfn)
2821                ms->m_asts |= AST_COMP;
2822
2823        /* compare with switch in create_message; send_remove() doesn't
2824           use send_args() */
2825
2826        switch (ms->m_type) {
2827        case DLM_MSG_REQUEST:
2828        case DLM_MSG_LOOKUP:
2829                memcpy(ms->m_extra, r->res_name, r->res_length);
2830                break;
2831        case DLM_MSG_CONVERT:
2832        case DLM_MSG_UNLOCK:
2833        case DLM_MSG_REQUEST_REPLY:
2834        case DLM_MSG_CONVERT_REPLY:
2835        case DLM_MSG_GRANT:
2836                if (!lkb->lkb_lvbptr)
2837                        break;
2838                memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2839                break;
2840        }
2841}
2842
2843static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2844{
2845        struct dlm_message *ms;
2846        struct dlm_mhandle *mh;
2847        int to_nodeid, error;
2848
2849        error = add_to_waiters(lkb, mstype);
2850        if (error)
2851                return error;
2852
2853        to_nodeid = r->res_nodeid;
2854
2855        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2856        if (error)
2857                goto fail;
2858
2859        send_args(r, lkb, ms);
2860
2861        error = send_message(mh, ms);
2862        if (error)
2863                goto fail;
2864        return 0;
2865
2866 fail:
2867        remove_from_waiters(lkb, msg_reply_type(mstype));
2868        return error;
2869}
2870
2871static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2872{
2873        return send_common(r, lkb, DLM_MSG_REQUEST);
2874}
2875
2876static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2877{
2878        int error;
2879
2880        error = send_common(r, lkb, DLM_MSG_CONVERT);
2881
2882        /* down conversions go without a reply from the master */
2883        if (!error && down_conversion(lkb)) {
2884                remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2885                r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2886                r->res_ls->ls_stub_ms.m_result = 0;
2887                r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2888                __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2889        }
2890
2891        return error;
2892}
2893
2894/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2895   MASTER_UNCERTAIN to force the next request on the rsb to confirm
2896   that the master is still correct. */
2897
2898static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2899{
2900        return send_common(r, lkb, DLM_MSG_UNLOCK);
2901}
2902
2903static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2904{
2905        return send_common(r, lkb, DLM_MSG_CANCEL);
2906}
2907
2908static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2909{
2910        struct dlm_message *ms;
2911        struct dlm_mhandle *mh;
2912        int to_nodeid, error;
2913
2914        to_nodeid = lkb->lkb_nodeid;
2915
2916        error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2917        if (error)
2918                goto out;
2919
2920        send_args(r, lkb, ms);
2921
2922        ms->m_result = 0;
2923
2924        error = send_message(mh, ms);
2925 out:
2926        return error;
2927}
2928
2929static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2930{
2931        struct dlm_message *ms;
2932        struct dlm_mhandle *mh;
2933        int to_nodeid, error;
2934
2935        to_nodeid = lkb->lkb_nodeid;
2936
2937        error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2938        if (error)
2939                goto out;
2940
2941        send_args(r, lkb, ms);
2942
2943        ms->m_bastmode = mode;
2944
2945        error = send_message(mh, ms);
2946 out:
2947        return error;
2948}
2949
2950static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2951{
2952        struct dlm_message *ms;
2953        struct dlm_mhandle *mh;
2954        int to_nodeid, error;
2955
2956        error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2957        if (error)
2958                return error;
2959
2960        to_nodeid = dlm_dir_nodeid(r);
2961
2962        error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2963        if (error)
2964                goto fail;
2965
2966        send_args(r, lkb, ms);
2967
2968        error = send_message(mh, ms);
2969        if (error)
2970                goto fail;
2971        return 0;
2972
2973 fail:
2974        remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2975        return error;
2976}
2977
2978static int send_remove(struct dlm_rsb *r)
2979{
2980        struct dlm_message *ms;
2981        struct dlm_mhandle *mh;
2982        int to_nodeid, error;
2983
2984        to_nodeid = dlm_dir_nodeid(r);
2985
2986        error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2987        if (error)
2988                goto out;
2989
2990        memcpy(ms->m_extra, r->res_name, r->res_length);
2991        ms->m_hash = r->res_hash;
2992
2993        error = send_message(mh, ms);
2994 out:
2995        return error;
2996}
2997
2998static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2999                             int mstype, int rv)
3000{
3001        struct dlm_message *ms;
3002        struct dlm_mhandle *mh;
3003        int to_nodeid, error;
3004
3005        to_nodeid = lkb->lkb_nodeid;
3006
3007        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3008        if (error)
3009                goto out;
3010
3011        send_args(r, lkb, ms);
3012
3013        ms->m_result = rv;
3014
3015        error = send_message(mh, ms);
3016 out:
3017        return error;
3018}
3019
3020static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3021{
3022        return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3023}
3024
3025static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3026{
3027        return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3028}
3029
3030static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3031{
3032        return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3033}
3034
3035static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3036{
3037        return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3038}
3039
3040static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3041                             int ret_nodeid, int rv)
3042{
3043        struct dlm_rsb *r = &ls->ls_stub_rsb;
3044        struct dlm_message *ms;
3045        struct dlm_mhandle *mh;
3046        int error, nodeid = ms_in->m_header.h_nodeid;
3047
3048        error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3049        if (error)
3050                goto out;
3051
3052        ms->m_lkid = ms_in->m_lkid;
3053        ms->m_result = rv;
3054        ms->m_nodeid = ret_nodeid;
3055
3056        error = send_message(mh, ms);
3057 out:
3058        return error;
3059}
3060
3061/* which args we save from a received message depends heavily on the type
3062   of message, unlike the send side where we can safely send everything about
3063   the lkb for any type of message */
3064
3065static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3066{
3067        lkb->lkb_exflags = ms->m_exflags;
3068        lkb->lkb_sbflags = ms->m_sbflags;
3069        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3070                         (ms->m_flags & 0x0000FFFF);
3071}
3072
3073static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3074{
3075        lkb->lkb_sbflags = ms->m_sbflags;
3076        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3077                         (ms->m_flags & 0x0000FFFF);
3078}
3079
3080static int receive_extralen(struct dlm_message *ms)
3081{
3082        return (ms->m_header.h_length - sizeof(struct dlm_message));
3083}
3084
3085static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3086                       struct dlm_message *ms)
3087{
3088        int len;
3089
3090        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3091                if (!lkb->lkb_lvbptr)
3092                        lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3093                if (!lkb->lkb_lvbptr)
3094                        return -ENOMEM;
3095                len = receive_extralen(ms);
3096                if (len > DLM_RESNAME_MAXLEN)
3097                        len = DLM_RESNAME_MAXLEN;
3098                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3099        }
3100        return 0;
3101}
3102
3103static void fake_bastfn(void *astparam, int mode)
3104{
3105        log_print("fake_bastfn should not be called");
3106}
3107
3108static void fake_astfn(void *astparam)
3109{
3110        log_print("fake_astfn should not be called");
3111}
3112
3113static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3114                                struct dlm_message *ms)
3115{
3116        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3117        lkb->lkb_ownpid = ms->m_pid;
3118        lkb->lkb_remid = ms->m_lkid;
3119        lkb->lkb_grmode = DLM_LOCK_IV;
3120        lkb->lkb_rqmode = ms->m_rqmode;
3121
3122        lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3123        lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3124
3125        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3126                /* lkb was just created so there won't be an lvb yet */
3127                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3128                if (!lkb->lkb_lvbptr)
3129                        return -ENOMEM;
3130        }
3131
3132        return 0;
3133}
3134
3135static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3136                                struct dlm_message *ms)
3137{
3138        if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3139                return -EBUSY;
3140
3141        if (receive_lvb(ls, lkb, ms))
3142                return -ENOMEM;
3143
3144        lkb->lkb_rqmode = ms->m_rqmode;
3145        lkb->lkb_lvbseq = ms->m_lvbseq;
3146
3147        return 0;
3148}
3149
3150static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3151                               struct dlm_message *ms)
3152{
3153        if (receive_lvb(ls, lkb, ms))
3154                return -ENOMEM;
3155        return 0;
3156}
3157
3158/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3159   uses to send a reply and that the remote end uses to process the reply. */
3160
3161static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3162{
3163        struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3164        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3165        lkb->lkb_remid = ms->m_lkid;
3166}
3167
3168/* This is called after the rsb is locked so that we can safely inspect
3169   fields in the lkb. */
3170
3171static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3172{
3173        int from = ms->m_header.h_nodeid;
3174        int error = 0;
3175
3176        switch (ms->m_type) {
3177        case DLM_MSG_CONVERT:
3178        case DLM_MSG_UNLOCK:
3179        case DLM_MSG_CANCEL:
3180                if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3181                        error = -EINVAL;
3182                break;
3183
3184        case DLM_MSG_CONVERT_REPLY:
3185        case DLM_MSG_UNLOCK_REPLY:
3186        case DLM_MSG_CANCEL_REPLY:
3187        case DLM_MSG_GRANT:
3188        case DLM_MSG_BAST:
3189                if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3190                        error = -EINVAL;
3191                break;
3192
3193        case DLM_MSG_REQUEST_REPLY:
3194                if (!is_process_copy(lkb))
3195                        error = -EINVAL;
3196                else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3197                        error = -EINVAL;
3198                break;
3199
3200        default:
3201                error = -EINVAL;
3202        }
3203
3204        if (error)
3205                log_error(lkb->lkb_resource->res_ls,
3206                          "ignore invalid message %d from %d %x %x %x %d",
3207                          ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3208                          lkb->lkb_flags, lkb->lkb_nodeid);
3209        return error;
3210}
3211
3212static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3213{
3214        struct dlm_lkb *lkb;
3215        struct dlm_rsb *r;
3216        int error, namelen;
3217
3218        error = create_lkb(ls, &lkb);
3219        if (error)
3220                goto fail;
3221
3222        receive_flags(lkb, ms);
3223        lkb->lkb_flags |= DLM_IFL_MSTCPY;
3224        error = receive_request_args(ls, lkb, ms);
3225        if (error) {
3226                __put_lkb(ls, lkb);
3227                goto fail;
3228        }
3229
3230        namelen = receive_extralen(ms);
3231
3232        error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3233        if (error) {
3234                __put_lkb(ls, lkb);
3235                goto fail;
3236        }
3237
3238        lock_rsb(r);
3239
3240        attach_lkb(r, lkb);
3241        error = do_request(r, lkb);
3242        send_request_reply(r, lkb, error);
3243        do_request_effects(r, lkb, error);
3244
3245        unlock_rsb(r);
3246        put_rsb(r);
3247
3248        if (error == -EINPROGRESS)
3249                error = 0;
3250        if (error)
3251                dlm_put_lkb(lkb);
3252        return;
3253
3254 fail:
3255        setup_stub_lkb(ls, ms);
3256        send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3257}
3258
3259static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3260{
3261        struct dlm_lkb *lkb;
3262        struct dlm_rsb *r;
3263        int error, reply = 1;
3264
3265        error = find_lkb(ls, ms->m_remid, &lkb);
3266        if (error)
3267                goto fail;
3268
3269        r = lkb->lkb_resource;
3270
3271        hold_rsb(r);
3272        lock_rsb(r);
3273
3274        error = validate_message(lkb, ms);
3275        if (error)
3276                goto out;
3277
3278        receive_flags(lkb, ms);
3279
3280        error = receive_convert_args(ls, lkb, ms);
3281        if (error) {
3282                send_convert_reply(r, lkb, error);
3283                goto out;
3284        }
3285
3286        reply = !down_conversion(lkb);
3287
3288        error = do_convert(r, lkb);
3289        if (reply)
3290                send_convert_reply(r, lkb, error);
3291        do_convert_effects(r, lkb, error);
3292 out:
3293        unlock_rsb(r);
3294        put_rsb(r);
3295        dlm_put_lkb(lkb);
3296        return;
3297
3298 fail:
3299        setup_stub_lkb(ls, ms);
3300        send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3301}
3302
3303static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3304{
3305        struct dlm_lkb *lkb;
3306        struct dlm_rsb *r;
3307        int error;
3308
3309        error = find_lkb(ls, ms->m_remid, &lkb);
3310        if (error)
3311                goto fail;
3312
3313        r = lkb->lkb_resource;
3314
3315        hold_rsb(r);
3316        lock_rsb(r);
3317
3318        error = validate_message(lkb, ms);
3319        if (error)
3320                goto out;
3321
3322        receive_flags(lkb, ms);
3323
3324        error = receive_unlock_args(ls, lkb, ms);
3325        if (error) {
3326                send_unlock_reply(r, lkb, error);
3327                goto out;
3328        }
3329
3330        error = do_unlock(r, lkb);
3331        send_unlock_reply(r, lkb, error);
3332        do_unlock_effects(r, lkb, error);
3333 out:
3334        unlock_rsb(r);
3335        put_rsb(r);
3336        dlm_put_lkb(lkb);
3337        return;
3338
3339 fail:
3340        setup_stub_lkb(ls, ms);
3341        send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3342}
3343
3344static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3345{
3346        struct dlm_lkb *lkb;
3347        struct dlm_rsb *r;
3348        int error;
3349
3350        error = find_lkb(ls, ms->m_remid, &lkb);
3351        if (error)
3352                goto fail;
3353
3354        receive_flags(lkb, ms);
3355
3356        r = lkb->lkb_resource;
3357
3358        hold_rsb(r);
3359        lock_rsb(r);
3360
3361        error = validate_message(lkb, ms);
3362        if (error)
3363                goto out;
3364
3365        error = do_cancel(r, lkb);
3366        send_cancel_reply(r, lkb, error);
3367        do_cancel_effects(r, lkb, error);
3368 out:
3369        unlock_rsb(r);
3370        put_rsb(r);
3371        dlm_put_lkb(lkb);
3372        return;
3373
3374 fail:
3375        setup_stub_lkb(ls, ms);
3376        send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3377}
3378
3379static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3380{
3381        struct dlm_lkb *lkb;
3382        struct dlm_rsb *r;
3383        int error;
3384
3385        error = find_lkb(ls, ms->m_remid, &lkb);
3386        if (error) {
3387                log_debug(ls, "receive_grant from %d no lkb %x",
3388                          ms->m_header.h_nodeid, ms->m_remid);
3389                return;
3390        }
3391
3392        r = lkb->lkb_resource;
3393
3394        hold_rsb(r);
3395        lock_rsb(r);
3396
3397        error = validate_message(lkb, ms);
3398        if (error)
3399                goto out;
3400
3401        receive_flags_reply(lkb, ms);
3402        if (is_altmode(lkb))
3403                munge_altmode(lkb, ms);
3404        grant_lock_pc(r, lkb, ms);
3405        queue_cast(r, lkb, 0);
3406 out:
3407        unlock_rsb(r);
3408        put_rsb(r);
3409        dlm_put_lkb(lkb);
3410}
3411
3412static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3413{
3414        struct dlm_lkb *lkb;
3415        struct dlm_rsb *r;
3416        int error;
3417
3418        error = find_lkb(ls, ms->m_remid, &lkb);
3419        if (error) {
3420                log_debug(ls, "receive_bast from %d no lkb %x",
3421                          ms->m_header.h_nodeid, ms->m_remid);
3422                return;
3423        }
3424
3425        r = lkb->lkb_resource;
3426
3427        hold_rsb(r);
3428        lock_rsb(r);
3429
3430        error = validate_message(lkb, ms);
3431        if (error)
3432                goto out;
3433
3434        queue_bast(r, lkb, ms->m_bastmode);
3435 out:
3436        unlock_rsb(r);
3437        put_rsb(r);
3438        dlm_put_lkb(lkb);
3439}
3440
3441static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3442{
3443        int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3444
3445        from_nodeid = ms->m_header.h_nodeid;
3446        our_nodeid = dlm_our_nodeid();
3447
3448        len = receive_extralen(ms);
3449
3450        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3451        if (dir_nodeid != our_nodeid) {
3452                log_error(ls, "lookup dir_nodeid %d from %d",
3453                          dir_nodeid, from_nodeid);
3454                error = -EINVAL;
3455                ret_nodeid = -1;
3456                goto out;
3457        }
3458
3459        error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3460
3461        /* Optimization: we're master so treat lookup as a request */
3462        if (!error && ret_nodeid == our_nodeid) {
3463                receive_request(ls, ms);
3464                return;
3465        }
3466 out:
3467        send_lookup_reply(ls, ms, ret_nodeid, error);
3468}
3469
3470static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3471{
3472        int len, dir_nodeid, from_nodeid;
3473
3474        from_nodeid = ms->m_header.h_nodeid;
3475
3476        len = receive_extralen(ms);
3477
3478        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3479        if (dir_nodeid != dlm_our_nodeid()) {
3480                log_error(ls, "remove dir entry dir_nodeid %d from %d",
3481                          dir_nodeid, from_nodeid);
3482                return;
3483        }
3484
3485        dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3486}
3487
3488static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3489{
3490        do_purge(ls, ms->m_nodeid, ms->m_pid);
3491}
3492
3493static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3494{
3495        struct dlm_lkb *lkb;
3496        struct dlm_rsb *r;
3497        int error, mstype, result;
3498
3499        error = find_lkb(ls, ms->m_remid, &lkb);
3500        if (error) {
3501                log_debug(ls, "receive_request_reply from %d no lkb %x",
3502                          ms->m_header.h_nodeid, ms->m_remid);
3503                return;
3504        }
3505
3506        r = lkb->lkb_resource;
3507        hold_rsb(r);
3508        lock_rsb(r);
3509
3510        error = validate_message(lkb, ms);
3511        if (error)
3512                goto out;
3513
3514        mstype = lkb->lkb_wait_type;
3515        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3516        if (error)
3517                goto out;
3518
3519        /* Optimization: the dir node was also the master, so it took our
3520           lookup as a request and sent request reply instead of lookup reply */
3521        if (mstype == DLM_MSG_LOOKUP) {
3522                r->res_nodeid = ms->m_header.h_nodeid;
3523                lkb->lkb_nodeid = r->res_nodeid;
3524        }
3525
3526        /* this is the value returned from do_request() on the master */
3527        result = ms->m_result;
3528
3529        switch (result) {
3530        case -EAGAIN:
3531                /* request would block (be queued) on remote master */
3532                queue_cast(r, lkb, -EAGAIN);
3533                confirm_master(r, -EAGAIN);
3534                unhold_lkb(lkb); /* undoes create_lkb() */
3535                break;
3536
3537        case -EINPROGRESS:
3538        case 0:
3539                /* request was queued or granted on remote master */
3540                receive_flags_reply(lkb, ms);
3541                lkb->lkb_remid = ms->m_lkid;
3542                if (is_altmode(lkb))
3543                        munge_altmode(lkb, ms);
3544                if (result) {
3545                        add_lkb(r, lkb, DLM_LKSTS_WAITING);
3546                        add_timeout(lkb);
3547                } else {
3548                        grant_lock_pc(r, lkb, ms);
3549                        queue_cast(r, lkb, 0);
3550                }
3551                confirm_master(r, result);
3552                break;
3553
3554        case -EBADR:
3555        case -ENOTBLK:
3556                /* find_rsb failed to find rsb or rsb wasn't master */
3557                log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3558                          lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3559                r->res_nodeid = -1;
3560                lkb->lkb_nodeid = -1;
3561
3562                if (is_overlap(lkb)) {
3563                        /* we'll ignore error in cancel/unlock reply */
3564                        queue_cast_overlap(r, lkb);
3565                        confirm_master(r, result);
3566                        unhold_lkb(lkb); /* undoes create_lkb() */
3567                } else
3568                        _request_lock(r, lkb);
3569                break;
3570
3571        default:
3572                log_error(ls, "receive_request_reply %x error %d",
3573                          lkb->lkb_id, result);
3574        }
3575
3576        if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3577                log_debug(ls, "receive_request_reply %x result %d unlock",
3578                          lkb->lkb_id, result);
3579                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3580                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3581                send_unlock(r, lkb);
3582        } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3583                log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3584                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3585                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3586                send_cancel(r, lkb);
3587        } else {
3588                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3589                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3590        }
3591 out:
3592        unlock_rsb(r);
3593        put_rsb(r);
3594        dlm_put_lkb(lkb);
3595}
3596
3597static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3598                                    struct dlm_message *ms)
3599{
3600        /* this is the value returned from do_convert() on the master */
3601        switch (ms->m_result) {
3602        case -EAGAIN:
3603                /* convert would block (be queued) on remote master */
3604                queue_cast(r, lkb, -EAGAIN);
3605                break;
3606
3607        case -EDEADLK:
3608                receive_flags_reply(lkb, ms);
3609                revert_lock_pc(r, lkb);
3610                queue_cast(r, lkb, -EDEADLK);
3611                break;
3612
3613        case -EINPROGRESS:
3614                /* convert was queued on remote master */
3615                receive_flags_reply(lkb, ms);
3616                if (is_demoted(lkb))
3617                        munge_demoted(lkb, ms);
3618                del_lkb(r, lkb);
3619                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3620                add_timeout(lkb);
3621                break;
3622
3623        case 0:
3624                /* convert was granted on remote master */
3625                receive_flags_reply(lkb, ms);
3626                if (is_demoted(lkb))
3627                        munge_demoted(lkb, ms);
3628                grant_lock_pc(r, lkb, ms);
3629                queue_cast(r, lkb, 0);
3630                break;
3631
3632        default:
3633                log_error(r->res_ls, "receive_convert_reply %x error %d",
3634                          lkb->lkb_id, ms->m_result);
3635        }
3636}
3637
3638static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3639{
3640        struct dlm_rsb *r = lkb->lkb_resource;
3641        int error;
3642
3643        hold_rsb(r);
3644        lock_rsb(r);
3645
3646        error = validate_message(lkb, ms);
3647        if (error)
3648                goto out;
3649
3650        /* stub reply can happen with waiters_mutex held */
3651        error = remove_from_waiters_ms(lkb, ms);
3652        if (error)
3653                goto out;
3654
3655        __receive_convert_reply(r, lkb, ms);
3656 out:
3657        unlock_rsb(r);
3658        put_rsb(r);
3659}
3660
3661static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3662{
3663        struct dlm_lkb *lkb;
3664        int error;
3665
3666        error = find_lkb(ls, ms->m_remid, &lkb);
3667        if (error) {
3668                log_debug(ls, "receive_convert_reply from %d no lkb %x",
3669                          ms->m_header.h_nodeid, ms->m_remid);
3670                return;
3671        }
3672
3673        _receive_convert_reply(lkb, ms);
3674        dlm_put_lkb(lkb);
3675}
3676
3677static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3678{
3679        struct dlm_rsb *r = lkb->lkb_resource;
3680        int error;
3681
3682        hold_rsb(r);
3683        lock_rsb(r);
3684
3685        error = validate_message(lkb, ms);
3686        if (error)
3687                goto out;
3688
3689        /* stub reply can happen with waiters_mutex held */
3690        error = remove_from_waiters_ms(lkb, ms);
3691        if (error)
3692                goto out;
3693
3694        /* this is the value returned from do_unlock() on the master */
3695
3696        switch (ms->m_result) {
3697        case -DLM_EUNLOCK:
3698                receive_flags_reply(lkb, ms);
3699                remove_lock_pc(r, lkb);
3700                queue_cast(r, lkb, -DLM_EUNLOCK);
3701                break;
3702        case -ENOENT:
3703                break;
3704        default:
3705                log_error(r->res_ls, "receive_unlock_reply %x error %d",
3706                          lkb->lkb_id, ms->m_result);
3707        }
3708 out:
3709        unlock_rsb(r);
3710        put_rsb(r);
3711}
3712
3713static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3714{
3715        struct dlm_lkb *lkb;
3716        int error;
3717
3718        error = find_lkb(ls, ms->m_remid, &lkb);
3719        if (error) {
3720                log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3721                          ms->m_header.h_nodeid, ms->m_remid);
3722                return;
3723        }
3724
3725        _receive_unlock_reply(lkb, ms);
3726        dlm_put_lkb(lkb);
3727}
3728
3729static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3730{
3731        struct dlm_rsb *r = lkb->lkb_resource;
3732        int error;
3733
3734        hold_rsb(r);
3735        lock_rsb(r);
3736
3737        error = validate_message(lkb, ms);
3738        if (error)
3739                goto out;
3740
3741        /* stub reply can happen with waiters_mutex held */
3742        error = remove_from_waiters_ms(lkb, ms);
3743        if (error)
3744                goto out;
3745
3746        /* this is the value returned from do_cancel() on the master */
3747
3748        switch (ms->m_result) {
3749        case -DLM_ECANCEL:
3750                receive_flags_reply(lkb, ms);
3751                revert_lock_pc(r, lkb);
3752                queue_cast(r, lkb, -DLM_ECANCEL);
3753                break;
3754        case 0:
3755                break;
3756        default:
3757                log_error(r->res_ls, "receive_cancel_reply %x error %d",
3758                          lkb->lkb_id, ms->m_result);
3759        }
3760 out:
3761        unlock_rsb(r);
3762        put_rsb(r);
3763}
3764
3765static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3766{
3767        struct dlm_lkb *lkb;
3768        int error;
3769
3770        error = find_lkb(ls, ms->m_remid, &lkb);
3771        if (error) {
3772                log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3773                          ms->m_header.h_nodeid, ms->m_remid);
3774                return;
3775        }
3776
3777        _receive_cancel_reply(lkb, ms);
3778        dlm_put_lkb(lkb);
3779}
3780
3781static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3782{
3783        struct dlm_lkb *lkb;
3784        struct dlm_rsb *r;
3785        int error, ret_nodeid;
3786
3787        error = find_lkb(ls, ms->m_lkid, &lkb);
3788        if (error) {
3789                log_error(ls, "receive_lookup_reply no lkb");
3790                return;
3791        }
3792
3793        /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3794           FIXME: will a non-zero error ever be returned? */
3795
3796        r = lkb->lkb_resource;
3797        hold_rsb(r);
3798        lock_rsb(r);
3799
3800        error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3801        if (error)
3802                goto out;
3803
3804        ret_nodeid = ms->m_nodeid;
3805        if (ret_nodeid == dlm_our_nodeid()) {
3806                r->res_nodeid = 0;
3807                ret_nodeid = 0;
3808                r->res_first_lkid = 0;
3809        } else {
3810                /* set_master() will copy res_nodeid to lkb_nodeid */
3811                r->res_nodeid = ret_nodeid;
3812        }
3813
3814        if (is_overlap(lkb)) {
3815                log_debug(ls, "receive_lookup_reply %x unlock %x",
3816                          lkb->lkb_id, lkb->lkb_flags);
3817                queue_cast_overlap(r, lkb);
3818                unhold_lkb(lkb); /* undoes create_lkb() */
3819                goto out_list;
3820        }
3821
3822        _request_lock(r, lkb);
3823
3824 out_list:
3825        if (!ret_nodeid)
3826                process_lookup_list(r);
3827 out:
3828        unlock_rsb(r);
3829        put_rsb(r);
3830        dlm_put_lkb(lkb);
3831}
3832
3833static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3834{
3835        if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3836                log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3837                          ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3838                          ms->m_remid, ms->m_result);
3839                return;
3840        }
3841
3842        switch (ms->m_type) {
3843
3844        /* messages sent to a master node */
3845
3846        case DLM_MSG_REQUEST:
3847                receive_request(ls, ms);
3848                break;
3849
3850        case DLM_MSG_CONVERT:
3851                receive_convert(ls, ms);
3852                break;
3853
3854        case DLM_MSG_UNLOCK:
3855                receive_unlock(ls, ms);
3856                break;
3857
3858        case DLM_MSG_CANCEL:
3859                receive_cancel(ls, ms);
3860                break;
3861
3862        /* messages sent from a master node (replies to above) */
3863
3864        case DLM_MSG_REQUEST_REPLY:
3865                receive_request_reply(ls, ms);
3866                break;
3867
3868        case DLM_MSG_CONVERT_REPLY:
3869                receive_convert_reply(ls, ms);
3870                break;
3871
3872        case DLM_MSG_UNLOCK_REPLY:
3873                receive_unlock_reply(ls, ms);
3874                break;
3875
3876        case DLM_MSG_CANCEL_REPLY:
3877                receive_cancel_reply(ls, ms);
3878                break;
3879
3880        /* messages sent from a master node (only two types of async msg) */
3881
3882        case DLM_MSG_GRANT:
3883                receive_grant(ls, ms);
3884                break;
3885
3886        case DLM_MSG_BAST:
3887                receive_bast(ls, ms);
3888                break;
3889
3890        /* messages sent to a dir node */
3891
3892        case DLM_MSG_LOOKUP:
3893                receive_lookup(ls, ms);
3894                break;
3895
3896        case DLM_MSG_REMOVE:
3897                receive_remove(ls, ms);
3898                break;
3899
3900        /* messages sent from a dir node (remove has no reply) */
3901
3902        case DLM_MSG_LOOKUP_REPLY:
3903                receive_lookup_reply(ls, ms);
3904                break;
3905
3906        /* other messages */
3907
3908        case DLM_MSG_PURGE:
3909                receive_purge(ls, ms);
3910                break;
3911
3912        default:
3913                log_error(ls, "unknown message type %d", ms->m_type);
3914        }
3915
3916        dlm_astd_wake();
3917}
3918
3919/* If the lockspace is in recovery mode (locking stopped), then normal
3920   messages are saved on the requestqueue for processing after recovery is
3921   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3922   messages off the requestqueue before we process new ones. This occurs right
3923   after recovery completes when we transition from saving all messages on
3924   requestqueue, to processing all the saved messages, to processing new
3925   messages as they arrive. */
3926
3927static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3928                                int nodeid)
3929{
3930        if (dlm_locking_stopped(ls)) {
3931                dlm_add_requestqueue(ls, nodeid, ms);
3932        } else {
3933                dlm_wait_requestqueue(ls);
3934                _receive_message(ls, ms);
3935        }
3936}
3937
3938/* This is called by dlm_recoverd to process messages that were saved on
3939   the requestqueue. */
3940
3941void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3942{
3943        _receive_message(ls, ms);
3944}
3945
3946/* This is called by the midcomms layer when something is received for
3947   the lockspace.  It could be either a MSG (normal message sent as part of
3948   standard locking activity) or an RCOM (recovery message sent as part of
3949   lockspace recovery). */
3950
3951void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3952{
3953        struct dlm_header *hd = &p->header;
3954        struct dlm_ls *ls;
3955        int type = 0;
3956
3957        switch (hd->h_cmd) {
3958        case DLM_MSG:
3959                dlm_message_in(&p->message);
3960                type = p->message.m_type;
3961                break;
3962        case DLM_RCOM:
3963                dlm_rcom_in(&p->rcom);
3964                type = p->rcom.rc_type;
3965                break;
3966        default:
3967                log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3968                return;
3969        }
3970
3971        if (hd->h_nodeid != nodeid) {
3972                log_print("invalid h_nodeid %d from %d lockspace %x",
3973                          hd->h_nodeid, nodeid, hd->h_lockspace);
3974                return;
3975        }
3976
3977        ls = dlm_find_lockspace_global(hd->h_lockspace);
3978        if (!ls) {
3979                if (dlm_config.ci_log_debug)
3980                        log_print("invalid lockspace %x from %d cmd %d type %d",
3981                                  hd->h_lockspace, nodeid, hd->h_cmd, type);
3982
3983                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3984                        dlm_send_ls_not_ready(nodeid, &p->rcom);
3985                return;
3986        }
3987
3988        /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3989           be inactive (in this ls) before transitioning to recovery mode */
3990
3991        down_read(&ls->ls_recv_active);
3992        if (hd->h_cmd == DLM_MSG)
3993                dlm_receive_message(ls, &p->message, nodeid);
3994        else
3995                dlm_receive_rcom(ls, &p->rcom, nodeid);
3996        up_read(&ls->ls_recv_active);
3997
3998        dlm_put_lockspace(ls);
3999}
4000
4001static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4002{
4003        if (middle_conversion(lkb)) {
4004                hold_lkb(lkb);
4005                ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
4006                ls->ls_stub_ms.m_result = -EINPROGRESS;
4007                ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4008                ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4009                _receive_convert_reply(lkb, &ls->ls_stub_ms);
4010
4011                /* Same special case as in receive_rcom_lock_args() */
4012                lkb->lkb_grmode = DLM_LOCK_IV;
4013                rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4014                unhold_lkb(lkb);
4015
4016        } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4017                lkb->lkb_flags |= DLM_IFL_RESEND;
4018        }
4019
4020        /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4021           conversions are async; there's no reply from the remote master */
4022}
4023
4024/* A waiting lkb needs recovery if the master node has failed, or
4025   the master node is changing (only when no directory is used) */
4026
4027static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4028{
4029        if (dlm_is_removed(ls, lkb->lkb_nodeid))
4030                return 1;
4031
4032        if (!dlm_no_directory(ls))
4033                return 0;
4034
4035        if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4036                return 1;
4037
4038        return 0;
4039}
4040
4041/* Recovery for locks that are waiting for replies from nodes that are now
4042   gone.  We can just complete unlocks and cancels by faking a reply from the
4043   dead node.  Requests and up-conversions we flag to be resent after
4044   recovery.  Down-conversions can just be completed with a fake reply like
4045   unlocks.  Conversions between PR and CW need special attention. */
4046
4047void dlm_recover_waiters_pre(struct dlm_ls *ls)
4048{
4049        struct dlm_lkb *lkb, *safe;
4050        int wait_type, stub_unlock_result, stub_cancel_result;
4051
4052        mutex_lock(&ls->ls_waiters_mutex);
4053
4054        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4055                log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4056                          lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4057
4058                /* all outstanding lookups, regardless of destination  will be
4059                   resent after recovery is done */
4060
4061                if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4062                        lkb->lkb_flags |= DLM_IFL_RESEND;
4063                        continue;
4064                }
4065
4066                if (!waiter_needs_recovery(ls, lkb))
4067                        continue;
4068
4069                wait_type = lkb->lkb_wait_type;
4070                stub_unlock_result = -DLM_EUNLOCK;
4071                stub_cancel_result = -DLM_ECANCEL;
4072
4073                /* Main reply may have been received leaving a zero wait_type,
4074                   but a reply for the overlapping op may not have been
4075                   received.  In that case we need to fake the appropriate
4076                   reply for the overlap op. */
4077
4078                if (!wait_type) {
4079                        if (is_overlap_cancel(lkb)) {
4080                                wait_type = DLM_MSG_CANCEL;
4081                                if (lkb->lkb_grmode == DLM_LOCK_IV)
4082                                        stub_cancel_result = 0;
4083                        }
4084                        if (is_overlap_unlock(lkb)) {
4085                                wait_type = DLM_MSG_UNLOCK;
4086                                if (lkb->lkb_grmode == DLM_LOCK_IV)
4087                                        stub_unlock_result = -ENOENT;
4088                        }
4089
4090                        log_debug(ls, "rwpre overlap %x %x %d %d %d",
4091                                  lkb->lkb_id, lkb->lkb_flags, wait_type,
4092                                  stub_cancel_result, stub_unlock_result);
4093                }
4094
4095                switch (wait_type) {
4096
4097                case DLM_MSG_REQUEST:
4098                        lkb->lkb_flags |= DLM_IFL_RESEND;
4099                        break;
4100
4101                case DLM_MSG_CONVERT:
4102                        recover_convert_waiter(ls, lkb);
4103                        break;
4104
4105                case DLM_MSG_UNLOCK:
4106                        hold_lkb(lkb);
4107                        ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4108                        ls->ls_stub_ms.m_result = stub_unlock_result;
4109                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4110                        ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4111                        _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4112                        dlm_put_lkb(lkb);
4113                        break;
4114
4115                case DLM_MSG_CANCEL:
4116                        hold_lkb(lkb);
4117                        ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4118                        ls->ls_stub_ms.m_result = stub_cancel_result;
4119                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4120                        ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4121                        _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4122                        dlm_put_lkb(lkb);
4123                        break;
4124
4125                default:
4126                        log_error(ls, "invalid lkb wait_type %d %d",
4127                                  lkb->lkb_wait_type, wait_type);
4128                }
4129                schedule();
4130        }
4131        mutex_unlock(&ls->ls_waiters_mutex);
4132}
4133
4134static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4135{
4136        struct dlm_lkb *lkb;
4137        int found = 0;
4138
4139        mutex_lock(&ls->ls_waiters_mutex);
4140        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4141                if (lkb->lkb_flags & DLM_IFL_RESEND) {
4142                        hold_lkb(lkb);
4143                        found = 1;
4144                        break;
4145                }
4146        }
4147        mutex_unlock(&ls->ls_waiters_mutex);
4148
4149        if (!found)
4150                lkb = NULL;
4151        return lkb;
4152}
4153
4154/* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4155   master or dir-node for r.  Processing the lkb may result in it being placed
4156   back on waiters. */
4157
4158/* We do this after normal locking has been enabled and any saved messages
4159   (in requestqueue) have been processed.  We should be confident that at
4160   this point we won't get or process a reply to any of these waiting
4161   operations.  But, new ops may be coming in on the rsbs/locks here from
4162   userspace or remotely. */
4163
4164/* there may have been an overlap unlock/cancel prior to recovery or after
4165   recovery.  if before, the lkb may still have a pos wait_count; if after, the
4166   overlap flag would just have been set and nothing new sent.  we can be
4167   confident here than any replies to either the initial op or overlap ops
4168   prior to recovery have been received. */
4169
4170int dlm_recover_waiters_post(struct dlm_ls *ls)
4171{
4172        struct dlm_lkb *lkb;
4173        struct dlm_rsb *r;
4174        int error = 0, mstype, err, oc, ou;
4175
4176        while (1) {
4177                if (dlm_locking_stopped(ls)) {
4178                        log_debug(ls, "recover_waiters_post aborted");
4179                        error = -EINTR;
4180                        break;
4181                }
4182
4183                lkb = find_resend_waiter(ls);
4184                if (!lkb)
4185                        break;
4186
4187                r = lkb->lkb_resource;
4188                hold_rsb(r);
4189                lock_rsb(r);
4190
4191                mstype = lkb->lkb_wait_type;
4192                oc = is_overlap_cancel(lkb);
4193                ou = is_overlap_unlock(lkb);
4194                err = 0;
4195
4196                log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4197                          lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4198
4199                /* At this point we assume that we won't get a reply to any
4200                   previous op or overlap op on this lock.  First, do a big
4201                   remove_from_waiters() for all previous ops. */
4202
4203                lkb->lkb_flags &= ~DLM_IFL_RESEND;
4204                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4205                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4206                lkb->lkb_wait_type = 0;
4207                lkb->lkb_wait_count = 0;
4208                mutex_lock(&ls->ls_waiters_mutex);
4209                list_del_init(&lkb->lkb_wait_reply);
4210                mutex_unlock(&ls->ls_waiters_mutex);
4211                unhold_lkb(lkb); /* for waiters list */
4212
4213                if (oc || ou) {
4214                        /* do an unlock or cancel instead of resending */
4215                        switch (mstype) {
4216                        case DLM_MSG_LOOKUP:
4217                        case DLM_MSG_REQUEST:
4218                                queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4219                                                        -DLM_ECANCEL);
4220                                unhold_lkb(lkb); /* undoes create_lkb() */
4221                                break;
4222                        case DLM_MSG_CONVERT:
4223                                if (oc) {
4224                                        queue_cast(r, lkb, -DLM_ECANCEL);
4225                                } else {
4226                                        lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4227                                        _unlock_lock(r, lkb);
4228                                }
4229                                break;
4230                        default:
4231                                err = 1;
4232                        }
4233                } else {
4234                        switch (mstype) {
4235                        case DLM_MSG_LOOKUP:
4236                        case DLM_MSG_REQUEST:
4237                                _request_lock(r, lkb);
4238                                if (is_master(r))
4239                                        confirm_master(r, 0);
4240                                break;
4241                        case DLM_MSG_CONVERT:
4242                                _convert_lock(r, lkb);
4243                                break;
4244                        default:
4245                                err = 1;
4246                        }
4247                }
4248
4249                if (err)
4250                        log_error(ls, "recover_waiters_post %x %d %x %d %d",
4251                                  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4252                unlock_rsb(r);
4253                put_rsb(r);
4254                dlm_put_lkb(lkb);
4255        }
4256
4257        return error;
4258}
4259
4260static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4261                        int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4262{
4263        struct dlm_ls *ls = r->res_ls;
4264        struct dlm_lkb *lkb, *safe;
4265
4266        list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4267                if (test(ls, lkb)) {
4268                        rsb_set_flag(r, RSB_LOCKS_PURGED);
4269                        del_lkb(r, lkb);
4270                        /* this put should free the lkb */
4271                        if (!dlm_put_lkb(lkb))
4272                                log_error(ls, "purged lkb not released");
4273                }
4274        }
4275}
4276
4277static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4278{
4279        return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4280}
4281
4282static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4283{
4284        return is_master_copy(lkb);
4285}
4286
4287static void purge_dead_locks(struct dlm_rsb *r)
4288{
4289        purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4290        purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4291        purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4292}
4293
4294void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4295{
4296        purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4297        purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4298        purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4299}
4300
4301/* Get rid of locks held by nodes that are gone. */
4302
4303int dlm_purge_locks(struct dlm_ls *ls)
4304{
4305        struct dlm_rsb *r;
4306
4307        log_debug(ls, "dlm_purge_locks");
4308
4309        down_write(&ls->ls_root_sem);
4310        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4311                hold_rsb(r);
4312                lock_rsb(r);
4313                if (is_master(r))
4314                        purge_dead_locks(r);
4315                unlock_rsb(r);
4316                unhold_rsb(r);
4317
4318                schedule();
4319        }
4320        up_write(&ls->ls_root_sem);
4321
4322        return 0;
4323}
4324
4325static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4326{
4327        struct dlm_rsb *r, *r_ret = NULL;
4328
4329        spin_lock(&ls->ls_rsbtbl[bucket].lock);
4330        list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4331                if (!rsb_flag(r, RSB_LOCKS_PURGED))
4332                        continue;
4333                hold_rsb(r);
4334                rsb_clear_flag(r, RSB_LOCKS_PURGED);
4335                r_ret = r;
4336                break;
4337        }
4338        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4339        return r_ret;
4340}
4341
4342void dlm_grant_after_purge(struct dlm_ls *ls)
4343{
4344        struct dlm_rsb *r;
4345        int bucket = 0;
4346
4347        while (1) {
4348                r = find_purged_rsb(ls, bucket);
4349                if (!r) {
4350                        if (bucket == ls->ls_rsbtbl_size - 1)
4351                                break;
4352                        bucket++;
4353                        continue;
4354                }
4355                lock_rsb(r);
4356                if (is_master(r)) {
4357                        grant_pending_locks(r);
4358                        confirm_master(r, 0);
4359                }
4360                unlock_rsb(r);
4361                put_rsb(r);
4362                schedule();
4363        }
4364}
4365
4366static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4367                                         uint32_t remid)
4368{
4369        struct dlm_lkb *lkb;
4370
4371        list_for_each_entry(lkb, head, lkb_statequeue) {
4372                if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4373                        return lkb;
4374        }
4375        return NULL;
4376}
4377
4378static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4379                                    uint32_t remid)
4380{
4381        struct dlm_lkb *lkb;
4382
4383        lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4384        if (lkb)
4385                return lkb;
4386        lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4387        if (lkb)
4388                return lkb;
4389        lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4390        if (lkb)
4391                return lkb;
4392        return NULL;
4393}
4394
4395/* needs at least dlm_rcom + rcom_lock */
4396static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4397                                  struct dlm_rsb *r, struct dlm_rcom *rc)
4398{
4399        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4400
4401        lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4402        lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4403        lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4404        lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4405        lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4406        lkb->lkb_flags |= DLM_IFL_MSTCPY;
4407        lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4408        lkb->lkb_rqmode = rl->rl_rqmode;
4409        lkb->lkb_grmode = rl->rl_grmode;
4410        /* don't set lkb_status because add_lkb wants to itself */
4411
4412        lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4413        lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4414
4415        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4416                int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4417                         sizeof(struct rcom_lock);
4418                if (lvblen > ls->ls_lvblen)
4419                        return -EINVAL;
4420                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4421                if (!lkb->lkb_lvbptr)
4422                        return -ENOMEM;
4423                memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4424        }
4425
4426        /* Conversions between PR and CW (middle modes) need special handling.
4427           The real granted mode of these converting locks cannot be determined
4428           until all locks have been rebuilt on the rsb (recover_conversion) */
4429
4430        if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4431            middle_conversion(lkb)) {
4432                rl->rl_status = DLM_LKSTS_CONVERT;
4433                lkb->lkb_grmode = DLM_LOCK_IV;
4434                rsb_set_flag(r, RSB_RECOVER_CONVERT);
4435        }
4436
4437        return 0;
4438}
4439
4440/* This lkb may have been recovered in a previous aborted recovery so we need
4441   to check if the rsb already has an lkb with the given remote nodeid/lkid.
4442   If so we just send back a standard reply.  If not, we create a new lkb with
4443   the given values and send back our lkid.  We send back our lkid by sending
4444   back the rcom_lock struct we got but with the remid field filled in. */
4445
4446/* needs at least dlm_rcom + rcom_lock */
4447int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4448{
4449        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4450        struct dlm_rsb *r;
4451        struct dlm_lkb *lkb;
4452        int error;
4453
4454        if (rl->rl_parent_lkid) {
4455                error = -EOPNOTSUPP;
4456                goto out;
4457        }
4458
4459        error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4460                         R_MASTER, &r);
4461        if (error)
4462                goto out;
4463
4464        lock_rsb(r);
4465
4466        lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4467        if (lkb) {
4468                error = -EEXIST;
4469                goto out_remid;
4470        }
4471
4472        error = create_lkb(ls, &lkb);
4473        if (error)
4474                goto out_unlock;
4475
4476        error = receive_rcom_lock_args(ls, lkb, r, rc);
4477        if (error) {
4478                __put_lkb(ls, lkb);
4479                goto out_unlock;
4480        }
4481
4482        attach_lkb(r, lkb);
4483        add_lkb(r, lkb, rl->rl_status);
4484        error = 0;
4485
4486 out_remid:
4487        /* this is the new value returned to the lock holder for
4488           saving in its process-copy lkb */
4489        rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4490
4491 out_unlock:
4492        unlock_rsb(r);
4493        put_rsb(r);
4494 out:
4495        if (error)
4496                log_debug(ls, "recover_master_copy %d %x", error,
4497                          le32_to_cpu(rl->rl_lkid));
4498        rl->rl_result = cpu_to_le32(error);
4499        return error;
4500}
4501
4502/* needs at least dlm_rcom + rcom_lock */
4503int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4504{
4505        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4506        struct dlm_rsb *r;
4507        struct dlm_lkb *lkb;
4508        int error;
4509
4510        error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4511        if (error) {
4512                log_error(ls, "recover_process_copy no lkid %x",
4513                                le32_to_cpu(rl->rl_lkid));
4514                return error;
4515        }
4516
4517        DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4518
4519        error = le32_to_cpu(rl->rl_result);
4520
4521        r = lkb->lkb_resource;
4522        hold_rsb(r);
4523        lock_rsb(r);
4524
4525        switch (error) {
4526        case -EBADR:
4527                /* There's a chance the new master received our lock before
4528                   dlm_recover_master_reply(), this wouldn't happen if we did
4529                   a barrier between recover_masters and recover_locks. */
4530                log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4531                          (unsigned long)r, r->res_name);
4532                dlm_send_rcom_lock(r, lkb);
4533                goto out;
4534        case -EEXIST:
4535                log_debug(ls, "master copy exists %x", lkb->lkb_id);
4536                /* fall through */
4537        case 0:
4538                lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4539                break;
4540        default:
4541                log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4542                          error, lkb->lkb_id);
4543        }
4544
4545        /* an ack for dlm_recover_locks() which waits for replies from
4546           all the locks it sends to new masters */
4547        dlm_recovered_lock(r);
4548 out:
4549        unlock_rsb(r);
4550        put_rsb(r);
4551        dlm_put_lkb(lkb);
4552
4553        return 0;
4554}
4555
4556int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4557                     int mode, uint32_t flags, void *name, unsigned int namelen,
4558                     unsigned long timeout_cs)
4559{
4560        struct dlm_lkb *lkb;
4561        struct dlm_args args;
4562        int error;
4563
4564        dlm_lock_recovery(ls);
4565
4566        error = create_lkb(ls, &lkb);
4567        if (error) {
4568                kfree(ua);
4569                goto out;
4570        }
4571
4572        if (flags & DLM_LKF_VALBLK) {
4573                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4574                if (!ua->lksb.sb_lvbptr) {
4575                        kfree(ua);
4576                        __put_lkb(ls, lkb);
4577                        error = -ENOMEM;
4578                        goto out;
4579                }
4580        }
4581
4582        /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4583           When DLM_IFL_USER is set, the dlm knows that this is a userspace
4584           lock and that lkb_astparam is the dlm_user_args structure. */
4585
4586        error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4587                              fake_astfn, ua, fake_bastfn, &args);
4588        lkb->lkb_flags |= DLM_IFL_USER;
4589        ua->old_mode = DLM_LOCK_IV;
4590
4591        if (error) {
4592                __put_lkb(ls, lkb);
4593                goto out;
4594        }
4595
4596        error = request_lock(ls, lkb, name, namelen, &args);
4597
4598        switch (error) {
4599        case 0:
4600                break;
4601        case -EINPROGRESS:
4602                error = 0;
4603                break;
4604        case -EAGAIN:
4605                error = 0;
4606                /* fall through */
4607        default:
4608                __put_lkb(ls, lkb);
4609                goto out;
4610        }
4611
4612        /* add this new lkb to the per-process list of locks */
4613        spin_lock(&ua->proc->locks_spin);
4614        hold_lkb(lkb);
4615        list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4616        spin_unlock(&ua->proc->locks_spin);
4617 out:
4618        dlm_unlock_recovery(ls);
4619        return error;
4620}
4621
4622int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4623                     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4624                     unsigned long timeout_cs)
4625{
4626        struct dlm_lkb *lkb;
4627        struct dlm_args args;
4628        struct dlm_user_args *ua;
4629        int error;
4630
4631        dlm_lock_recovery(ls);
4632
4633        error = find_lkb(ls, lkid, &lkb);
4634        if (error)
4635                goto out;
4636
4637        /* user can change the params on its lock when it converts it, or
4638           add an lvb that didn't exist before */
4639
4640        ua = lkb->lkb_ua;
4641
4642        if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4643                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4644                if (!ua->lksb.sb_lvbptr) {
4645                        error = -ENOMEM;
4646                        goto out_put;
4647                }
4648        }
4649        if (lvb_in && ua->lksb.sb_lvbptr)
4650                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4651
4652        ua->xid = ua_tmp->xid;
4653        ua->castparam = ua_tmp->castparam;
4654        ua->castaddr = ua_tmp->castaddr;
4655        ua->bastparam = ua_tmp->bastparam;
4656        ua->bastaddr = ua_tmp->bastaddr;
4657        ua->user_lksb = ua_tmp->user_lksb;
4658        ua->old_mode = lkb->lkb_grmode;
4659
4660        error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4661                              fake_astfn, ua, fake_bastfn, &args);
4662        if (error)
4663                goto out_put;
4664
4665        error = convert_lock(ls, lkb, &args);
4666
4667        if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4668                error = 0;
4669 out_put:
4670        dlm_put_lkb(lkb);
4671 out:
4672        dlm_unlock_recovery(ls);
4673        kfree(ua_tmp);
4674        return error;
4675}
4676
4677int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4678                    uint32_t flags, uint32_t lkid, char *lvb_in)
4679{
4680        struct dlm_lkb *lkb;
4681        struct dlm_args args;
4682        struct dlm_user_args *ua;
4683        int error;
4684
4685        dlm_lock_recovery(ls);
4686
4687        error = find_lkb(ls, lkid, &lkb);
4688        if (error)
4689                goto out;
4690
4691        ua = lkb->lkb_ua;
4692
4693        if (lvb_in && ua->lksb.sb_lvbptr)
4694                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4695        if (ua_tmp->castparam)
4696                ua->castparam = ua_tmp->castparam;
4697        ua->user_lksb = ua_tmp->user_lksb;
4698
4699        error = set_unlock_args(flags, ua, &args);
4700        if (error)
4701                goto out_put;
4702
4703        error = unlock_lock(ls, lkb, &args);
4704
4705        if (error == -DLM_EUNLOCK)
4706                error = 0;
4707        /* from validate_unlock_args() */
4708        if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4709                error = 0;
4710        if (error)
4711                goto out_put;
4712
4713        spin_lock(&ua->proc->locks_spin);
4714        /* dlm_user_add_ast() may have already taken lkb off the proc list */
4715        if (!list_empty(&lkb->lkb_ownqueue))
4716                list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4717        spin_unlock(&ua->proc->locks_spin);
4718 out_put:
4719        dlm_put_lkb(lkb);
4720 out:
4721        dlm_unlock_recovery(ls);
4722        kfree(ua_tmp);
4723        return error;
4724}
4725
4726int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4727                    uint32_t flags, uint32_t lkid)
4728{
4729        struct dlm_lkb *lkb;
4730        struct dlm_args args;
4731        struct dlm_user_args *ua;
4732        int error;
4733
4734        dlm_lock_recovery(ls);
4735
4736        error = find_lkb(ls, lkid, &lkb);
4737        if (error)
4738                goto out;
4739
4740        ua = lkb->lkb_ua;
4741        if (ua_tmp->castparam)
4742                ua->castparam = ua_tmp->castparam;
4743        ua->user_lksb = ua_tmp->user_lksb;
4744
4745        error = set_unlock_args(flags, ua, &args);
4746        if (error)
4747                goto out_put;
4748
4749        error = cancel_lock(ls, lkb, &args);
4750
4751        if (error == -DLM_ECANCEL)
4752                error = 0;
4753        /* from validate_unlock_args() */
4754        if (error == -EBUSY)
4755                error = 0;
4756 out_put:
4757        dlm_put_lkb(lkb);
4758 out:
4759        dlm_unlock_recovery(ls);
4760        kfree(ua_tmp);
4761        return error;
4762}
4763
4764int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4765{
4766        struct dlm_lkb *lkb;
4767        struct dlm_args args;
4768        struct dlm_user_args *ua;
4769        struct dlm_rsb *r;
4770        int error;
4771
4772        dlm_lock_recovery(ls);
4773
4774        error = find_lkb(ls, lkid, &lkb);
4775        if (error)
4776                goto out;
4777
4778        ua = lkb->lkb_ua;
4779
4780        error = set_unlock_args(flags, ua, &args);
4781        if (error)
4782                goto out_put;
4783
4784        /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4785
4786        r = lkb->lkb_resource;
4787        hold_rsb(r);
4788        lock_rsb(r);
4789
4790        error = validate_unlock_args(lkb, &args);
4791        if (error)
4792                goto out_r;
4793        lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4794
4795        error = _cancel_lock(r, lkb);
4796 out_r:
4797        unlock_rsb(r);
4798        put_rsb(r);
4799
4800        if (error == -DLM_ECANCEL)
4801                error = 0;
4802        /* from validate_unlock_args() */
4803        if (error == -EBUSY)
4804                error = 0;
4805 out_put:
4806        dlm_put_lkb(lkb);
4807 out:
4808        dlm_unlock_recovery(ls);
4809        return error;
4810}
4811
4812/* lkb's that are removed from the waiters list by revert are just left on the
4813   orphans list with the granted orphan locks, to be freed by purge */
4814
4815static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4816{
4817        struct dlm_args args;
4818        int error;
4819
4820        hold_lkb(lkb);
4821        mutex_lock(&ls->ls_orphans_mutex);
4822        list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4823        mutex_unlock(&ls->ls_orphans_mutex);
4824
4825        set_unlock_args(0, lkb->lkb_ua, &args);
4826
4827        error = cancel_lock(ls, lkb, &args);
4828        if (error == -DLM_ECANCEL)
4829                error = 0;
4830        return error;
4831}
4832
4833/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4834   Regardless of what rsb queue the lock is on, it's removed and freed. */
4835
4836static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4837{
4838        struct dlm_args args;
4839        int error;
4840
4841        set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4842
4843        error = unlock_lock(ls, lkb, &args);
4844        if (error == -DLM_EUNLOCK)
4845                error = 0;
4846        return error;
4847}
4848
4849/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4850   (which does lock_rsb) due to deadlock with receiving a message that does
4851   lock_rsb followed by dlm_user_add_ast() */
4852
4853static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4854                                     struct dlm_user_proc *proc)
4855{
4856        struct dlm_lkb *lkb = NULL;
4857
4858        mutex_lock(&ls->ls_clear_proc_locks);
4859        if (list_empty(&proc->locks))
4860                goto out;
4861
4862        lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4863        list_del_init(&lkb->lkb_ownqueue);
4864
4865        if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4866                lkb->lkb_flags |= DLM_IFL_ORPHAN;
4867        else
4868                lkb->lkb_flags |= DLM_IFL_DEAD;
4869 out:
4870        mutex_unlock(&ls->ls_clear_proc_locks);
4871        return lkb;
4872}
4873
4874/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4875   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4876   which we clear here. */
4877
4878/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4879   list, and no more device_writes should add lkb's to proc->locks list; so we
4880   shouldn't need to take asts_spin or locks_spin here.  this assumes that
4881   device reads/writes/closes are serialized -- FIXME: we may need to serialize
4882   them ourself. */
4883
4884void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4885{
4886        struct dlm_lkb *lkb, *safe;
4887
4888        dlm_lock_recovery(ls);
4889
4890        while (1) {
4891                lkb = del_proc_lock(ls, proc);
4892                if (!lkb)
4893                        break;
4894                del_timeout(lkb);
4895                if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4896                        orphan_proc_lock(ls, lkb);
4897                else
4898                        unlock_proc_lock(ls, lkb);
4899
4900                /* this removes the reference for the proc->locks list
4901                   added by dlm_user_request, it may result in the lkb
4902                   being freed */
4903
4904                dlm_put_lkb(lkb);
4905        }
4906
4907        mutex_lock(&ls->ls_clear_proc_locks);
4908
4909        /* in-progress unlocks */
4910        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4911                list_del_init(&lkb->lkb_ownqueue);
4912                lkb->lkb_flags |= DLM_IFL_DEAD;
4913                dlm_put_lkb(lkb);
4914        }
4915
4916        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4917                lkb->lkb_ast_type = 0;
4918                list_del(&lkb->lkb_astqueue);
4919                dlm_put_lkb(lkb);
4920        }
4921
4922        mutex_unlock(&ls->ls_clear_proc_locks);
4923        dlm_unlock_recovery(ls);
4924}
4925
4926static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4927{
4928        struct dlm_lkb *lkb, *safe;
4929
4930        while (1) {
4931                lkb = NULL;
4932                spin_lock(&proc->locks_spin);
4933                if (!list_empty(&proc->locks)) {
4934                        lkb = list_entry(proc->locks.next, struct dlm_lkb,
4935                                         lkb_ownqueue);
4936                        list_del_init(&lkb->lkb_ownqueue);
4937                }
4938                spin_unlock(&proc->locks_spin);
4939
4940                if (!lkb)
4941                        break;
4942
4943                lkb->lkb_flags |= DLM_IFL_DEAD;
4944                unlock_proc_lock(ls, lkb);
4945                dlm_put_lkb(lkb); /* ref from proc->locks list */
4946        }
4947
4948        spin_lock(&proc->locks_spin);
4949        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4950                list_del_init(&lkb->lkb_ownqueue);
4951                lkb->lkb_flags |= DLM_IFL_DEAD;
4952                dlm_put_lkb(lkb);
4953        }
4954        spin_unlock(&proc->locks_spin);
4955
4956        spin_lock(&proc->asts_spin);
4957        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4958                list_del(&lkb->lkb_astqueue);
4959                dlm_put_lkb(lkb);
4960        }
4961        spin_unlock(&proc->asts_spin);
4962}
4963
4964/* pid of 0 means purge all orphans */
4965
4966static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4967{
4968        struct dlm_lkb *lkb, *safe;
4969
4970        mutex_lock(&ls->ls_orphans_mutex);
4971        list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4972                if (pid && lkb->lkb_ownpid != pid)
4973                        continue;
4974                unlock_proc_lock(ls, lkb);
4975                list_del_init(&lkb->lkb_ownqueue);
4976                dlm_put_lkb(lkb);
4977        }
4978        mutex_unlock(&ls->ls_orphans_mutex);
4979}
4980
4981static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4982{
4983        struct dlm_message *ms;
4984        struct dlm_mhandle *mh;
4985        int error;
4986
4987        error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4988                                DLM_MSG_PURGE, &ms, &mh);
4989        if (error)
4990                return error;
4991        ms->m_nodeid = nodeid;
4992        ms->m_pid = pid;
4993
4994        return send_message(mh, ms);
4995}
4996
4997int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4998                   int nodeid, int pid)
4999{
5000        int error = 0;
5001
5002        if (nodeid != dlm_our_nodeid()) {
5003                error = send_purge(ls, nodeid, pid);
5004        } else {
5005                dlm_lock_recovery(ls);
5006                if (pid == current->pid)
5007                        purge_proc_locks(ls, proc);
5008                else
5009                        do_purge(ls, nodeid, pid);
5010                dlm_unlock_recovery(ls);
5011        }
5012        return error;
5013}
5014
5015
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.