linux/fs/dlm/rcom.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include "dlm_internal.h"
  13#include "lockspace.h"
  14#include "member.h"
  15#include "lowcomms.h"
  16#include "midcomms.h"
  17#include "rcom.h"
  18#include "recover.h"
  19#include "dir.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "util.h"
  24
  25static int rcom_response(struct dlm_ls *ls)
  26{
  27        return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
  28}
  29
  30static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  31                         struct dlm_rcom **rc_ret, char *mb, int mb_len)
  32{
  33        struct dlm_rcom *rc;
  34
  35        rc = (struct dlm_rcom *) mb;
  36
  37        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  38        rc->rc_header.u.h_lockspace = ls->ls_global_id;
  39        rc->rc_header.h_nodeid = dlm_our_nodeid();
  40        rc->rc_header.h_length = mb_len;
  41        rc->rc_header.h_cmd = DLM_RCOM;
  42
  43        rc->rc_type = type;
  44
  45        spin_lock(&ls->ls_recover_lock);
  46        rc->rc_seq = ls->ls_recover_seq;
  47        spin_unlock(&ls->ls_recover_lock);
  48
  49        *rc_ret = rc;
  50}
  51
  52static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  53                       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
  54{
  55        int mb_len = sizeof(struct dlm_rcom) + len;
  56        struct dlm_mhandle *mh;
  57        char *mb;
  58
  59        mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
  60        if (!mh) {
  61                log_print("%s to %d type %d len %d ENOBUFS",
  62                          __func__, to_nodeid, type, len);
  63                return -ENOBUFS;
  64        }
  65
  66        _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
  67        *mh_ret = mh;
  68        return 0;
  69}
  70
  71static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
  72                                 int len, struct dlm_rcom **rc_ret,
  73                                 struct dlm_msg **msg_ret)
  74{
  75        int mb_len = sizeof(struct dlm_rcom) + len;
  76        struct dlm_msg *msg;
  77        char *mb;
  78
  79        msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
  80                                   NULL, NULL);
  81        if (!msg) {
  82                log_print("create_rcom to %d type %d len %d ENOBUFS",
  83                          to_nodeid, type, len);
  84                return -ENOBUFS;
  85        }
  86
  87        _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
  88        *msg_ret = msg;
  89        return 0;
  90}
  91
  92static void _send_rcom(struct dlm_ls *ls, struct dlm_rcom *rc)
  93{
  94        dlm_rcom_out(rc);
  95}
  96
  97static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
  98                      struct dlm_rcom *rc)
  99{
 100        _send_rcom(ls, rc);
 101        dlm_midcomms_commit_mhandle(mh);
 102}
 103
 104static void send_rcom_stateless(struct dlm_ls *ls, struct dlm_msg *msg,
 105                                struct dlm_rcom *rc)
 106{
 107        _send_rcom(ls, rc);
 108        dlm_lowcomms_commit_msg(msg);
 109        dlm_lowcomms_put_msg(msg);
 110}
 111
 112static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
 113                            uint32_t flags)
 114{
 115        rs->rs_flags = cpu_to_le32(flags);
 116}
 117
 118/* When replying to a status request, a node also sends back its
 119   configuration values.  The requesting node then checks that the remote
 120   node is configured the same way as itself. */
 121
 122static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
 123                            uint32_t num_slots)
 124{
 125        rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
 126        rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
 127
 128        rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
 129        rf->rf_num_slots = cpu_to_le16(num_slots);
 130        rf->rf_generation =  cpu_to_le32(ls->ls_generation);
 131}
 132
 133static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 134{
 135        struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
 136
 137        if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
 138                log_error(ls, "version mismatch: %x nodeid %d: %x",
 139                          DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
 140                          rc->rc_header.h_version);
 141                return -EPROTO;
 142        }
 143
 144        if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
 145            le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
 146                log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
 147                          ls->ls_lvblen, ls->ls_exflags, nodeid,
 148                          le32_to_cpu(rf->rf_lvblen),
 149                          le32_to_cpu(rf->rf_lsflags));
 150                return -EPROTO;
 151        }
 152        return 0;
 153}
 154
 155static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
 156{
 157        spin_lock(&ls->ls_rcom_spin);
 158        *new_seq = ++ls->ls_rcom_seq;
 159        set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 160        spin_unlock(&ls->ls_rcom_spin);
 161}
 162
 163static void disallow_sync_reply(struct dlm_ls *ls)
 164{
 165        spin_lock(&ls->ls_rcom_spin);
 166        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 167        clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
 168        spin_unlock(&ls->ls_rcom_spin);
 169}
 170
 171/*
 172 * low nodeid gathers one slot value at a time from each node.
 173 * it sets need_slots=0, and saves rf_our_slot returned from each
 174 * rcom_config.
 175 *
 176 * other nodes gather all slot values at once from the low nodeid.
 177 * they set need_slots=1, and ignore the rf_our_slot returned from each
 178 * rcom_config.  they use the rf_num_slots returned from the low
 179 * node's rcom_config.
 180 */
 181
 182int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
 183{
 184        struct dlm_rcom *rc;
 185        struct dlm_msg *msg;
 186        int error = 0;
 187
 188        ls->ls_recover_nodeid = nodeid;
 189
 190        if (nodeid == dlm_our_nodeid()) {
 191                rc = ls->ls_recover_buf;
 192                rc->rc_result = dlm_recover_status(ls);
 193                goto out;
 194        }
 195
 196retry:
 197        error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
 198                                      sizeof(struct rcom_status), &rc, &msg);
 199        if (error)
 200                goto out;
 201
 202        set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
 203
 204        allow_sync_reply(ls, &rc->rc_id);
 205        memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
 206
 207        send_rcom_stateless(ls, msg, rc);
 208
 209        error = dlm_wait_function(ls, &rcom_response);
 210        disallow_sync_reply(ls);
 211        if (error == -ETIMEDOUT)
 212                goto retry;
 213        if (error)
 214                goto out;
 215
 216        rc = ls->ls_recover_buf;
 217
 218        if (rc->rc_result == -ESRCH) {
 219                /* we pretend the remote lockspace exists with 0 status */
 220                log_debug(ls, "remote node %d not ready", nodeid);
 221                rc->rc_result = 0;
 222                error = 0;
 223        } else {
 224                error = check_rcom_config(ls, rc, nodeid);
 225        }
 226
 227        /* the caller looks at rc_result for the remote recovery status */
 228 out:
 229        return error;
 230}
 231
 232static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 233{
 234        struct dlm_rcom *rc;
 235        struct rcom_status *rs;
 236        uint32_t status;
 237        int nodeid = rc_in->rc_header.h_nodeid;
 238        int len = sizeof(struct rcom_config);
 239        struct dlm_msg *msg;
 240        int num_slots = 0;
 241        int error;
 242
 243        if (!dlm_slots_version(&rc_in->rc_header)) {
 244                status = dlm_recover_status(ls);
 245                goto do_create;
 246        }
 247
 248        rs = (struct rcom_status *)rc_in->rc_buf;
 249
 250        if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
 251                status = dlm_recover_status(ls);
 252                goto do_create;
 253        }
 254
 255        spin_lock(&ls->ls_recover_lock);
 256        status = ls->ls_recover_status;
 257        num_slots = ls->ls_num_slots;
 258        spin_unlock(&ls->ls_recover_lock);
 259        len += num_slots * sizeof(struct rcom_slot);
 260
 261 do_create:
 262        error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
 263                                      len, &rc, &msg);
 264        if (error)
 265                return;
 266
 267        rc->rc_id = rc_in->rc_id;
 268        rc->rc_seq_reply = rc_in->rc_seq;
 269        rc->rc_result = status;
 270
 271        set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
 272
 273        if (!num_slots)
 274                goto do_send;
 275
 276        spin_lock(&ls->ls_recover_lock);
 277        if (ls->ls_num_slots != num_slots) {
 278                spin_unlock(&ls->ls_recover_lock);
 279                log_debug(ls, "receive_rcom_status num_slots %d to %d",
 280                          num_slots, ls->ls_num_slots);
 281                rc->rc_result = 0;
 282                set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
 283                goto do_send;
 284        }
 285
 286        dlm_slots_copy_out(ls, rc);
 287        spin_unlock(&ls->ls_recover_lock);
 288
 289 do_send:
 290        send_rcom_stateless(ls, msg, rc);
 291}
 292
 293static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 294{
 295        spin_lock(&ls->ls_rcom_spin);
 296        if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
 297            rc_in->rc_id != ls->ls_rcom_seq) {
 298                log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
 299                          rc_in->rc_type, rc_in->rc_header.h_nodeid,
 300                          (unsigned long long)rc_in->rc_id,
 301                          (unsigned long long)ls->ls_rcom_seq);
 302                goto out;
 303        }
 304        memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
 305        set_bit(LSFL_RCOM_READY, &ls->ls_flags);
 306        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 307        wake_up(&ls->ls_wait_general);
 308 out:
 309        spin_unlock(&ls->ls_rcom_spin);
 310}
 311
 312int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
 313{
 314        struct dlm_rcom *rc;
 315        struct dlm_msg *msg;
 316        int error = 0;
 317
 318        ls->ls_recover_nodeid = nodeid;
 319
 320retry:
 321        error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
 322                                      &rc, &msg);
 323        if (error)
 324                goto out;
 325        memcpy(rc->rc_buf, last_name, last_len);
 326
 327        allow_sync_reply(ls, &rc->rc_id);
 328        memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
 329
 330        send_rcom_stateless(ls, msg, rc);
 331
 332        error = dlm_wait_function(ls, &rcom_response);
 333        disallow_sync_reply(ls);
 334        if (error == -ETIMEDOUT)
 335                goto retry;
 336 out:
 337        return error;
 338}
 339
 340static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 341{
 342        struct dlm_rcom *rc;
 343        int error, inlen, outlen, nodeid;
 344        struct dlm_msg *msg;
 345
 346        nodeid = rc_in->rc_header.h_nodeid;
 347        inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 348        outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
 349
 350        error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
 351                                      &rc, &msg);
 352        if (error)
 353                return;
 354        rc->rc_id = rc_in->rc_id;
 355        rc->rc_seq_reply = rc_in->rc_seq;
 356
 357        dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
 358                              nodeid);
 359        send_rcom_stateless(ls, msg, rc);
 360}
 361
 362int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
 363{
 364        struct dlm_rcom *rc;
 365        struct dlm_mhandle *mh;
 366        struct dlm_ls *ls = r->res_ls;
 367        int error;
 368
 369        error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
 370                            &rc, &mh);
 371        if (error)
 372                goto out;
 373        memcpy(rc->rc_buf, r->res_name, r->res_length);
 374        rc->rc_id = (unsigned long) r->res_id;
 375
 376        send_rcom(ls, mh, rc);
 377 out:
 378        return error;
 379}
 380
 381static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 382{
 383        struct dlm_rcom *rc;
 384        struct dlm_mhandle *mh;
 385        int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
 386        int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 387
 388        /* Old code would send this special id to trigger a debug dump. */
 389        if (rc_in->rc_id == 0xFFFFFFFF) {
 390                log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
 391                dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
 392                return;
 393        }
 394
 395        error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
 396        if (error)
 397                return;
 398
 399        error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
 400                                  DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
 401        if (error)
 402                ret_nodeid = error;
 403        rc->rc_result = ret_nodeid;
 404        rc->rc_id = rc_in->rc_id;
 405        rc->rc_seq_reply = rc_in->rc_seq;
 406
 407        send_rcom(ls, mh, rc);
 408}
 409
 410static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 411{
 412        dlm_recover_master_reply(ls, rc_in);
 413}
 414
 415static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
 416                           struct rcom_lock *rl)
 417{
 418        memset(rl, 0, sizeof(*rl));
 419
 420        rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
 421        rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
 422        rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
 423        rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
 424        rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
 425        rl->rl_rqmode = lkb->lkb_rqmode;
 426        rl->rl_grmode = lkb->lkb_grmode;
 427        rl->rl_status = lkb->lkb_status;
 428        rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
 429
 430        if (lkb->lkb_bastfn)
 431                rl->rl_asts |= DLM_CB_BAST;
 432        if (lkb->lkb_astfn)
 433                rl->rl_asts |= DLM_CB_CAST;
 434
 435        rl->rl_namelen = cpu_to_le16(r->res_length);
 436        memcpy(rl->rl_name, r->res_name, r->res_length);
 437
 438        /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
 439           If so, receive_rcom_lock_args() won't take this copy. */
 440
 441        if (lkb->lkb_lvbptr)
 442                memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
 443}
 444
 445int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 446{
 447        struct dlm_ls *ls = r->res_ls;
 448        struct dlm_rcom *rc;
 449        struct dlm_mhandle *mh;
 450        struct rcom_lock *rl;
 451        int error, len = sizeof(struct rcom_lock);
 452
 453        if (lkb->lkb_lvbptr)
 454                len += ls->ls_lvblen;
 455
 456        error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
 457        if (error)
 458                goto out;
 459
 460        rl = (struct rcom_lock *) rc->rc_buf;
 461        pack_rcom_lock(r, lkb, rl);
 462        rc->rc_id = (unsigned long) r;
 463
 464        send_rcom(ls, mh, rc);
 465 out:
 466        return error;
 467}
 468
 469/* needs at least dlm_rcom + rcom_lock */
 470static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 471{
 472        struct dlm_rcom *rc;
 473        struct dlm_mhandle *mh;
 474        int error, nodeid = rc_in->rc_header.h_nodeid;
 475
 476        dlm_recover_master_copy(ls, rc_in);
 477
 478        error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
 479                            sizeof(struct rcom_lock), &rc, &mh);
 480        if (error)
 481                return;
 482
 483        /* We send back the same rcom_lock struct we received, but
 484           dlm_recover_master_copy() has filled in rl_remid and rl_result */
 485
 486        memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
 487        rc->rc_id = rc_in->rc_id;
 488        rc->rc_seq_reply = rc_in->rc_seq;
 489
 490        send_rcom(ls, mh, rc);
 491}
 492
 493/* If the lockspace doesn't exist then still send a status message
 494   back; it's possible that it just doesn't have its global_id yet. */
 495
 496int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 497{
 498        struct dlm_rcom *rc;
 499        struct rcom_config *rf;
 500        struct dlm_mhandle *mh;
 501        char *mb;
 502        int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 503
 504        mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
 505        if (!mh)
 506                return -ENOBUFS;
 507
 508        rc = (struct dlm_rcom *) mb;
 509
 510        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 511        rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
 512        rc->rc_header.h_nodeid = dlm_our_nodeid();
 513        rc->rc_header.h_length = mb_len;
 514        rc->rc_header.h_cmd = DLM_RCOM;
 515
 516        rc->rc_type = DLM_RCOM_STATUS_REPLY;
 517        rc->rc_id = rc_in->rc_id;
 518        rc->rc_seq_reply = rc_in->rc_seq;
 519        rc->rc_result = -ESRCH;
 520
 521        rf = (struct rcom_config *) rc->rc_buf;
 522        rf->rf_lvblen = cpu_to_le32(~0U);
 523
 524        dlm_rcom_out(rc);
 525        dlm_midcomms_commit_mhandle(mh);
 526
 527        return 0;
 528}
 529
 530/*
 531 * Ignore messages for stage Y before we set
 532 * recover_status bit for stage X:
 533 *
 534 * recover_status = 0
 535 *
 536 * dlm_recover_members()
 537 * - send nothing
 538 * - recv nothing
 539 * - ignore NAMES, NAMES_REPLY
 540 * - ignore LOOKUP, LOOKUP_REPLY
 541 * - ignore LOCK, LOCK_REPLY
 542 *
 543 * recover_status |= NODES
 544 *
 545 * dlm_recover_members_wait()
 546 *
 547 * dlm_recover_directory()
 548 * - send NAMES
 549 * - recv NAMES_REPLY
 550 * - ignore LOOKUP, LOOKUP_REPLY
 551 * - ignore LOCK, LOCK_REPLY
 552 *
 553 * recover_status |= DIR
 554 *
 555 * dlm_recover_directory_wait()
 556 *
 557 * dlm_recover_masters()
 558 * - send LOOKUP
 559 * - recv LOOKUP_REPLY
 560 *
 561 * dlm_recover_locks()
 562 * - send LOCKS
 563 * - recv LOCKS_REPLY
 564 *
 565 * recover_status |= LOCKS
 566 *
 567 * dlm_recover_locks_wait()
 568 *
 569 * recover_status |= DONE
 570 */
 571
 572/* Called by dlm_recv; corresponds to dlm_receive_message() but special
 573   recovery-only comms are sent through here. */
 574
 575void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 576{
 577        int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
 578        int stop, reply = 0, names = 0, lookup = 0, lock = 0;
 579        uint32_t status;
 580        uint64_t seq;
 581
 582        switch (rc->rc_type) {
 583        case DLM_RCOM_STATUS_REPLY:
 584                reply = 1;
 585                break;
 586        case DLM_RCOM_NAMES:
 587                names = 1;
 588                break;
 589        case DLM_RCOM_NAMES_REPLY:
 590                names = 1;
 591                reply = 1;
 592                break;
 593        case DLM_RCOM_LOOKUP:
 594                lookup = 1;
 595                break;
 596        case DLM_RCOM_LOOKUP_REPLY:
 597                lookup = 1;
 598                reply = 1;
 599                break;
 600        case DLM_RCOM_LOCK:
 601                lock = 1;
 602                break;
 603        case DLM_RCOM_LOCK_REPLY:
 604                lock = 1;
 605                reply = 1;
 606                break;
 607        }
 608
 609        spin_lock(&ls->ls_recover_lock);
 610        status = ls->ls_recover_status;
 611        stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 612        seq = ls->ls_recover_seq;
 613        spin_unlock(&ls->ls_recover_lock);
 614
 615        if (stop && (rc->rc_type != DLM_RCOM_STATUS))
 616                goto ignore;
 617
 618        if (reply && (rc->rc_seq_reply != seq))
 619                goto ignore;
 620
 621        if (!(status & DLM_RS_NODES) && (names || lookup || lock))
 622                goto ignore;
 623
 624        if (!(status & DLM_RS_DIR) && (lookup || lock))
 625                goto ignore;
 626
 627        switch (rc->rc_type) {
 628        case DLM_RCOM_STATUS:
 629                receive_rcom_status(ls, rc);
 630                break;
 631
 632        case DLM_RCOM_NAMES:
 633                receive_rcom_names(ls, rc);
 634                break;
 635
 636        case DLM_RCOM_LOOKUP:
 637                receive_rcom_lookup(ls, rc);
 638                break;
 639
 640        case DLM_RCOM_LOCK:
 641                if (rc->rc_header.h_length < lock_size)
 642                        goto Eshort;
 643                receive_rcom_lock(ls, rc);
 644                break;
 645
 646        case DLM_RCOM_STATUS_REPLY:
 647                receive_sync_reply(ls, rc);
 648                break;
 649
 650        case DLM_RCOM_NAMES_REPLY:
 651                receive_sync_reply(ls, rc);
 652                break;
 653
 654        case DLM_RCOM_LOOKUP_REPLY:
 655                receive_rcom_lookup_reply(ls, rc);
 656                break;
 657
 658        case DLM_RCOM_LOCK_REPLY:
 659                if (rc->rc_header.h_length < lock_size)
 660                        goto Eshort;
 661                dlm_recover_process_copy(ls, rc);
 662                break;
 663
 664        default:
 665                log_error(ls, "receive_rcom bad type %d", rc->rc_type);
 666        }
 667        return;
 668
 669ignore:
 670        log_limit(ls, "dlm_receive_rcom ignore msg %d "
 671                  "from %d %llu %llu recover seq %llu sts %x gen %u",
 672                   rc->rc_type,
 673                   nodeid,
 674                   (unsigned long long)rc->rc_seq,
 675                   (unsigned long long)rc->rc_seq_reply,
 676                   (unsigned long long)seq,
 677                   status, ls->ls_generation);
 678        return;
 679Eshort:
 680        log_error(ls, "recovery message %d from %d is too short",
 681                  rc->rc_type, nodeid);
 682}
 683
 684