linux/fs/gfs2/lock_dlm.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
   3 * Copyright 2004-2011 Red Hat, Inc.
   4 *
   5 * This copyrighted material is made available to anyone wishing to use,
   6 * modify, copy, or redistribute it subject to the terms and conditions
   7 * of the GNU General Public License version 2.
   8 */
   9
  10#include <linux/fs.h>
  11#include <linux/dlm.h>
  12#include <linux/slab.h>
  13#include <linux/types.h>
  14#include <linux/delay.h>
  15#include <linux/gfs2_ondisk.h>
  16
  17#include "incore.h"
  18#include "glock.h"
  19#include "util.h"
  20#include "sys.h"
  21#include "trace_gfs2.h"
  22
  23extern struct workqueue_struct *gfs2_control_wq;
  24
  25/**
  26 * gfs2_update_stats - Update time based stats
  27 * @mv: Pointer to mean/variance structure to update
  28 * @sample: New data to include
  29 *
  30 * @delta is the difference between the current rtt sample and the
  31 * running average srtt. We add 1/8 of that to the srtt in order to
  32 * update the current srtt estimate. The varience estimate is a bit
  33 * more complicated. We subtract the abs value of the @delta from
  34 * the current variance estimate and add 1/4 of that to the running
  35 * total.
  36 *
  37 * Note that the index points at the array entry containing the smoothed
  38 * mean value, and the variance is always in the following entry
  39 *
  40 * Reference: TCP/IP Illustrated, vol 2, p. 831,832
  41 * All times are in units of integer nanoseconds. Unlike the TCP/IP case,
  42 * they are not scaled fixed point.
  43 */
  44
  45static inline void gfs2_update_stats(struct gfs2_lkstats *s, unsigned index,
  46                                     s64 sample)
  47{
  48        s64 delta = sample - s->stats[index];
  49        s->stats[index] += (delta >> 3);
  50        index++;
  51        s->stats[index] += ((abs64(delta) - s->stats[index]) >> 2);
  52}
  53
  54/**
  55 * gfs2_update_reply_times - Update locking statistics
  56 * @gl: The glock to update
  57 *
  58 * This assumes that gl->gl_dstamp has been set earlier.
  59 *
  60 * The rtt (lock round trip time) is an estimate of the time
  61 * taken to perform a dlm lock request. We update it on each
  62 * reply from the dlm.
  63 *
  64 * The blocking flag is set on the glock for all dlm requests
  65 * which may potentially block due to lock requests from other nodes.
  66 * DLM requests where the current lock state is exclusive, the
  67 * requested state is null (or unlocked) or where the TRY or
  68 * TRY_1CB flags are set are classified as non-blocking. All
  69 * other DLM requests are counted as (potentially) blocking.
  70 */
  71static inline void gfs2_update_reply_times(struct gfs2_glock *gl)
  72{
  73        struct gfs2_pcpu_lkstats *lks;
  74        const unsigned gltype = gl->gl_name.ln_type;
  75        unsigned index = test_bit(GLF_BLOCKING, &gl->gl_flags) ?
  76                         GFS2_LKS_SRTTB : GFS2_LKS_SRTT;
  77        s64 rtt;
  78
  79        preempt_disable();
  80        rtt = ktime_to_ns(ktime_sub(ktime_get_real(), gl->gl_dstamp));
  81        lks = this_cpu_ptr(gl->gl_sbd->sd_lkstats);
  82        gfs2_update_stats(&gl->gl_stats, index, rtt);           /* Local */
  83        gfs2_update_stats(&lks->lkstats[gltype], index, rtt);   /* Global */
  84        preempt_enable();
  85
  86        trace_gfs2_glock_lock_time(gl, rtt);
  87}
  88
  89/**
  90 * gfs2_update_request_times - Update locking statistics
  91 * @gl: The glock to update
  92 *
  93 * The irt (lock inter-request times) measures the average time
  94 * between requests to the dlm. It is updated immediately before
  95 * each dlm call.
  96 */
  97
  98static inline void gfs2_update_request_times(struct gfs2_glock *gl)
  99{
 100        struct gfs2_pcpu_lkstats *lks;
 101        const unsigned gltype = gl->gl_name.ln_type;
 102        ktime_t dstamp;
 103        s64 irt;
 104
 105        preempt_disable();
 106        dstamp = gl->gl_dstamp;
 107        gl->gl_dstamp = ktime_get_real();
 108        irt = ktime_to_ns(ktime_sub(gl->gl_dstamp, dstamp));
 109        lks = this_cpu_ptr(gl->gl_sbd->sd_lkstats);
 110        gfs2_update_stats(&gl->gl_stats, GFS2_LKS_SIRT, irt);           /* Local */
 111        gfs2_update_stats(&lks->lkstats[gltype], GFS2_LKS_SIRT, irt);   /* Global */
 112        preempt_enable();
 113}
 114 
 115static void gdlm_ast(void *arg)
 116{
 117        struct gfs2_glock *gl = arg;
 118        unsigned ret = gl->gl_state;
 119
 120        gfs2_update_reply_times(gl);
 121        BUG_ON(gl->gl_lksb.sb_flags & DLM_SBF_DEMOTED);
 122
 123        if (gl->gl_lksb.sb_flags & DLM_SBF_VALNOTVALID)
 124                memset(gl->gl_lvb, 0, GDLM_LVB_SIZE);
 125
 126        switch (gl->gl_lksb.sb_status) {
 127        case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
 128                gfs2_glock_free(gl);
 129                return;
 130        case -DLM_ECANCEL: /* Cancel while getting lock */
 131                ret |= LM_OUT_CANCELED;
 132                goto out;
 133        case -EAGAIN: /* Try lock fails */
 134        case -EDEADLK: /* Deadlock detected */
 135                goto out;
 136        case -ETIMEDOUT: /* Canceled due to timeout */
 137                ret |= LM_OUT_ERROR;
 138                goto out;
 139        case 0: /* Success */
 140                break;
 141        default: /* Something unexpected */
 142                BUG();
 143        }
 144
 145        ret = gl->gl_req;
 146        if (gl->gl_lksb.sb_flags & DLM_SBF_ALTMODE) {
 147                if (gl->gl_req == LM_ST_SHARED)
 148                        ret = LM_ST_DEFERRED;
 149                else if (gl->gl_req == LM_ST_DEFERRED)
 150                        ret = LM_ST_SHARED;
 151                else
 152                        BUG();
 153        }
 154
 155        set_bit(GLF_INITIAL, &gl->gl_flags);
 156        gfs2_glock_complete(gl, ret);
 157        return;
 158out:
 159        if (!test_bit(GLF_INITIAL, &gl->gl_flags))
 160                gl->gl_lksb.sb_lkid = 0;
 161        gfs2_glock_complete(gl, ret);
 162}
 163
 164static void gdlm_bast(void *arg, int mode)
 165{
 166        struct gfs2_glock *gl = arg;
 167
 168        switch (mode) {
 169        case DLM_LOCK_EX:
 170                gfs2_glock_cb(gl, LM_ST_UNLOCKED);
 171                break;
 172        case DLM_LOCK_CW:
 173                gfs2_glock_cb(gl, LM_ST_DEFERRED);
 174                break;
 175        case DLM_LOCK_PR:
 176                gfs2_glock_cb(gl, LM_ST_SHARED);
 177                break;
 178        default:
 179                printk(KERN_ERR "unknown bast mode %d", mode);
 180                BUG();
 181        }
 182}
 183
 184/* convert gfs lock-state to dlm lock-mode */
 185
 186static int make_mode(const unsigned int lmstate)
 187{
 188        switch (lmstate) {
 189        case LM_ST_UNLOCKED:
 190                return DLM_LOCK_NL;
 191        case LM_ST_EXCLUSIVE:
 192                return DLM_LOCK_EX;
 193        case LM_ST_DEFERRED:
 194                return DLM_LOCK_CW;
 195        case LM_ST_SHARED:
 196                return DLM_LOCK_PR;
 197        }
 198        printk(KERN_ERR "unknown LM state %d", lmstate);
 199        BUG();
 200        return -1;
 201}
 202
 203static u32 make_flags(struct gfs2_glock *gl, const unsigned int gfs_flags,
 204                      const int req)
 205{
 206        u32 lkf = DLM_LKF_VALBLK;
 207        u32 lkid = gl->gl_lksb.sb_lkid;
 208
 209        if (gfs_flags & LM_FLAG_TRY)
 210                lkf |= DLM_LKF_NOQUEUE;
 211
 212        if (gfs_flags & LM_FLAG_TRY_1CB) {
 213                lkf |= DLM_LKF_NOQUEUE;
 214                lkf |= DLM_LKF_NOQUEUEBAST;
 215        }
 216
 217        if (gfs_flags & LM_FLAG_PRIORITY) {
 218                lkf |= DLM_LKF_NOORDER;
 219                lkf |= DLM_LKF_HEADQUE;
 220        }
 221
 222        if (gfs_flags & LM_FLAG_ANY) {
 223                if (req == DLM_LOCK_PR)
 224                        lkf |= DLM_LKF_ALTCW;
 225                else if (req == DLM_LOCK_CW)
 226                        lkf |= DLM_LKF_ALTPR;
 227                else
 228                        BUG();
 229        }
 230
 231        if (lkid != 0) {
 232                lkf |= DLM_LKF_CONVERT;
 233                if (test_bit(GLF_BLOCKING, &gl->gl_flags))
 234                        lkf |= DLM_LKF_QUECVT;
 235        }
 236
 237        return lkf;
 238}
 239
 240static void gfs2_reverse_hex(char *c, u64 value)
 241{
 242        while (value) {
 243                *c-- = hex_asc[value & 0x0f];
 244                value >>= 4;
 245        }
 246}
 247
 248static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state,
 249                     unsigned int flags)
 250{
 251        struct lm_lockstruct *ls = &gl->gl_sbd->sd_lockstruct;
 252        int req;
 253        u32 lkf;
 254        char strname[GDLM_STRNAME_BYTES] = "";
 255
 256        req = make_mode(req_state);
 257        lkf = make_flags(gl, flags, req);
 258        gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
 259        gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
 260        if (gl->gl_lksb.sb_lkid) {
 261                gfs2_update_request_times(gl);
 262        } else {
 263                memset(strname, ' ', GDLM_STRNAME_BYTES - 1);
 264                strname[GDLM_STRNAME_BYTES - 1] = '\0';
 265                gfs2_reverse_hex(strname + 7, gl->gl_name.ln_type);
 266                gfs2_reverse_hex(strname + 23, gl->gl_name.ln_number);
 267                gl->gl_dstamp = ktime_get_real();
 268        }
 269        /*
 270         * Submit the actual lock request.
 271         */
 272
 273        return dlm_lock(ls->ls_dlm, req, &gl->gl_lksb, lkf, strname,
 274                        GDLM_STRNAME_BYTES - 1, 0, gdlm_ast, gl, gdlm_bast);
 275}
 276
 277static void gdlm_put_lock(struct gfs2_glock *gl)
 278{
 279        struct gfs2_sbd *sdp = gl->gl_sbd;
 280        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 281        int error;
 282
 283        if (gl->gl_lksb.sb_lkid == 0) {
 284                gfs2_glock_free(gl);
 285                return;
 286        }
 287
 288        clear_bit(GLF_BLOCKING, &gl->gl_flags);
 289        gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
 290        gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
 291        gfs2_update_request_times(gl);
 292        error = dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_VALBLK,
 293                           NULL, gl);
 294        if (error) {
 295                printk(KERN_ERR "gdlm_unlock %x,%llx err=%d\n",
 296                       gl->gl_name.ln_type,
 297                       (unsigned long long)gl->gl_name.ln_number, error);
 298                return;
 299        }
 300}
 301
 302static void gdlm_cancel(struct gfs2_glock *gl)
 303{
 304        struct lm_lockstruct *ls = &gl->gl_sbd->sd_lockstruct;
 305        dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl);
 306}
 307
 308/*
 309 * dlm/gfs2 recovery coordination using dlm_recover callbacks
 310 *
 311 *  1. dlm_controld sees lockspace members change
 312 *  2. dlm_controld blocks dlm-kernel locking activity
 313 *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
 314 *  4. dlm_controld starts and finishes its own user level recovery
 315 *  5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery
 316 *  6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot)
 317 *  7. dlm_recoverd does its own lock recovery
 318 *  8. dlm_recoverd unblocks dlm-kernel locking activity
 319 *  9. dlm_recoverd notifies gfs2 when done (recover_done with new generation)
 320 * 10. gfs2_control updates control_lock lvb with new generation and jid bits
 321 * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none)
 322 * 12. gfs2_recover dequeues and recovers journals of failed nodes
 323 * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result)
 324 * 14. gfs2_control updates control_lock lvb jid bits for recovered journals
 325 * 15. gfs2_control unblocks normal locking when all journals are recovered
 326 *
 327 * - failures during recovery
 328 *
 329 * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control
 330 * clears BLOCK_LOCKS (step 15), e.g. another node fails while still
 331 * recovering for a prior failure.  gfs2_control needs a way to detect
 332 * this so it can leave BLOCK_LOCKS set in step 15.  This is managed using
 333 * the recover_block and recover_start values.
 334 *
 335 * recover_done() provides a new lockspace generation number each time it
 336 * is called (step 9).  This generation number is saved as recover_start.
 337 * When recover_prep() is called, it sets BLOCK_LOCKS and sets
 338 * recover_block = recover_start.  So, while recover_block is equal to
 339 * recover_start, BLOCK_LOCKS should remain set.  (recover_spin must
 340 * be held around the BLOCK_LOCKS/recover_block/recover_start logic.)
 341 *
 342 * - more specific gfs2 steps in sequence above
 343 *
 344 *  3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start
 345 *  6. recover_slot records any failed jids (maybe none)
 346 *  9. recover_done sets recover_start = new generation number
 347 * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids
 348 * 12. gfs2_recover does journal recoveries for failed jids identified above
 349 * 14. gfs2_control clears control_lock lvb bits for recovered jids
 350 * 15. gfs2_control checks if recover_block == recover_start (step 3 occured
 351 *     again) then do nothing, otherwise if recover_start > recover_block
 352 *     then clear BLOCK_LOCKS.
 353 *
 354 * - parallel recovery steps across all nodes
 355 *
 356 * All nodes attempt to update the control_lock lvb with the new generation
 357 * number and jid bits, but only the first to get the control_lock EX will
 358 * do so; others will see that it's already done (lvb already contains new
 359 * generation number.)
 360 *
 361 * . All nodes get the same recover_prep/recover_slot/recover_done callbacks
 362 * . All nodes attempt to set control_lock lvb gen + bits for the new gen
 363 * . One node gets control_lock first and writes the lvb, others see it's done
 364 * . All nodes attempt to recover jids for which they see control_lock bits set
 365 * . One node succeeds for a jid, and that one clears the jid bit in the lvb
 366 * . All nodes will eventually see all lvb bits clear and unblock locks
 367 *
 368 * - is there a problem with clearing an lvb bit that should be set
 369 *   and missing a journal recovery?
 370 *
 371 * 1. jid fails
 372 * 2. lvb bit set for step 1
 373 * 3. jid recovered for step 1
 374 * 4. jid taken again (new mount)
 375 * 5. jid fails (for step 4)
 376 * 6. lvb bit set for step 5 (will already be set)
 377 * 7. lvb bit cleared for step 3
 378 *
 379 * This is not a problem because the failure in step 5 does not
 380 * require recovery, because the mount in step 4 could not have
 381 * progressed far enough to unblock locks and access the fs.  The
 382 * control_mount() function waits for all recoveries to be complete
 383 * for the latest lockspace generation before ever unblocking locks
 384 * and returning.  The mount in step 4 waits until the recovery in
 385 * step 1 is done.
 386 *
 387 * - special case of first mounter: first node to mount the fs
 388 *
 389 * The first node to mount a gfs2 fs needs to check all the journals
 390 * and recover any that need recovery before other nodes are allowed
 391 * to mount the fs.  (Others may begin mounting, but they must wait
 392 * for the first mounter to be done before taking locks on the fs
 393 * or accessing the fs.)  This has two parts:
 394 *
 395 * 1. The mounted_lock tells a node it's the first to mount the fs.
 396 * Each node holds the mounted_lock in PR while it's mounted.
 397 * Each node tries to acquire the mounted_lock in EX when it mounts.
 398 * If a node is granted the mounted_lock EX it means there are no
 399 * other mounted nodes (no PR locks exist), and it is the first mounter.
 400 * The mounted_lock is demoted to PR when first recovery is done, so
 401 * others will fail to get an EX lock, but will get a PR lock.
 402 *
 403 * 2. The control_lock blocks others in control_mount() while the first
 404 * mounter is doing first mount recovery of all journals.
 405 * A mounting node needs to acquire control_lock in EX mode before
 406 * it can proceed.  The first mounter holds control_lock in EX while doing
 407 * the first mount recovery, blocking mounts from other nodes, then demotes
 408 * control_lock to NL when it's done (others_may_mount/first_done),
 409 * allowing other nodes to continue mounting.
 410 *
 411 * first mounter:
 412 * control_lock EX/NOQUEUE success
 413 * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters)
 414 * set first=1
 415 * do first mounter recovery
 416 * mounted_lock EX->PR
 417 * control_lock EX->NL, write lvb generation
 418 *
 419 * other mounter:
 420 * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry)
 421 * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR)
 422 * mounted_lock PR/NOQUEUE success
 423 * read lvb generation
 424 * control_lock EX->NL
 425 * set first=0
 426 *
 427 * - mount during recovery
 428 *
 429 * If a node mounts while others are doing recovery (not first mounter),
 430 * the mounting node will get its initial recover_done() callback without
 431 * having seen any previous failures/callbacks.
 432 *
 433 * It must wait for all recoveries preceding its mount to be finished
 434 * before it unblocks locks.  It does this by repeating the "other mounter"
 435 * steps above until the lvb generation number is >= its mount generation
 436 * number (from initial recover_done) and all lvb bits are clear.
 437 *
 438 * - control_lock lvb format
 439 *
 440 * 4 bytes generation number: the latest dlm lockspace generation number
 441 * from recover_done callback.  Indicates the jid bitmap has been updated
 442 * to reflect all slot failures through that generation.
 443 * 4 bytes unused.
 444 * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates
 445 * that jid N needs recovery.
 446 */
 447
 448#define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */
 449
 450static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen,
 451                             char *lvb_bits)
 452{
 453        uint32_t gen;
 454        memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE);
 455        memcpy(&gen, lvb_bits, sizeof(uint32_t));
 456        *lvb_gen = le32_to_cpu(gen);
 457}
 458
 459static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen,
 460                              char *lvb_bits)
 461{
 462        uint32_t gen;
 463        memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE);
 464        gen = cpu_to_le32(lvb_gen);
 465        memcpy(ls->ls_control_lvb, &gen, sizeof(uint32_t));
 466}
 467
 468static int all_jid_bits_clear(char *lvb)
 469{
 470        int i;
 471        for (i = JID_BITMAP_OFFSET; i < GDLM_LVB_SIZE; i++) {
 472                if (lvb[i])
 473                        return 0;
 474        }
 475        return 1;
 476}
 477
 478static void sync_wait_cb(void *arg)
 479{
 480        struct lm_lockstruct *ls = arg;
 481        complete(&ls->ls_sync_wait);
 482}
 483
 484static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name)
 485{
 486        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 487        int error;
 488
 489        error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls);
 490        if (error) {
 491                fs_err(sdp, "%s lkid %x error %d\n",
 492                       name, lksb->sb_lkid, error);
 493                return error;
 494        }
 495
 496        wait_for_completion(&ls->ls_sync_wait);
 497
 498        if (lksb->sb_status != -DLM_EUNLOCK) {
 499                fs_err(sdp, "%s lkid %x status %d\n",
 500                       name, lksb->sb_lkid, lksb->sb_status);
 501                return -1;
 502        }
 503        return 0;
 504}
 505
 506static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags,
 507                     unsigned int num, struct dlm_lksb *lksb, char *name)
 508{
 509        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 510        char strname[GDLM_STRNAME_BYTES];
 511        int error, status;
 512
 513        memset(strname, 0, GDLM_STRNAME_BYTES);
 514        snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num);
 515
 516        error = dlm_lock(ls->ls_dlm, mode, lksb, flags,
 517                         strname, GDLM_STRNAME_BYTES - 1,
 518                         0, sync_wait_cb, ls, NULL);
 519        if (error) {
 520                fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n",
 521                       name, lksb->sb_lkid, flags, mode, error);
 522                return error;
 523        }
 524
 525        wait_for_completion(&ls->ls_sync_wait);
 526
 527        status = lksb->sb_status;
 528
 529        if (status && status != -EAGAIN) {
 530                fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n",
 531                       name, lksb->sb_lkid, flags, mode, status);
 532        }
 533
 534        return status;
 535}
 536
 537static int mounted_unlock(struct gfs2_sbd *sdp)
 538{
 539        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 540        return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock");
 541}
 542
 543static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
 544{
 545        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 546        return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK,
 547                         &ls->ls_mounted_lksb, "mounted_lock");
 548}
 549
 550static int control_unlock(struct gfs2_sbd *sdp)
 551{
 552        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 553        return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock");
 554}
 555
 556static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
 557{
 558        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 559        return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK,
 560                         &ls->ls_control_lksb, "control_lock");
 561}
 562
 563static void gfs2_control_func(struct work_struct *work)
 564{
 565        struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work);
 566        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 567        char lvb_bits[GDLM_LVB_SIZE];
 568        uint32_t block_gen, start_gen, lvb_gen, flags;
 569        int recover_set = 0;
 570        int write_lvb = 0;
 571        int recover_size;
 572        int i, error;
 573
 574        spin_lock(&ls->ls_recover_spin);
 575        /*
 576         * No MOUNT_DONE means we're still mounting; control_mount()
 577         * will set this flag, after which this thread will take over
 578         * all further clearing of BLOCK_LOCKS.
 579         *
 580         * FIRST_MOUNT means this node is doing first mounter recovery,
 581         * for which recovery control is handled by
 582         * control_mount()/control_first_done(), not this thread.
 583         */
 584        if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
 585             test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
 586                spin_unlock(&ls->ls_recover_spin);
 587                return;
 588        }
 589        block_gen = ls->ls_recover_block;
 590        start_gen = ls->ls_recover_start;
 591        spin_unlock(&ls->ls_recover_spin);
 592
 593        /*
 594         * Equal block_gen and start_gen implies we are between
 595         * recover_prep and recover_done callbacks, which means
 596         * dlm recovery is in progress and dlm locking is blocked.
 597         * There's no point trying to do any work until recover_done.
 598         */
 599
 600        if (block_gen == start_gen)
 601                return;
 602
 603        /*
 604         * Propagate recover_submit[] and recover_result[] to lvb:
 605         * dlm_recoverd adds to recover_submit[] jids needing recovery
 606         * gfs2_recover adds to recover_result[] journal recovery results
 607         *
 608         * set lvb bit for jids in recover_submit[] if the lvb has not
 609         * yet been updated for the generation of the failure
 610         *
 611         * clear lvb bit for jids in recover_result[] if the result of
 612         * the journal recovery is SUCCESS
 613         */
 614
 615        error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
 616        if (error) {
 617                fs_err(sdp, "control lock EX error %d\n", error);
 618                return;
 619        }
 620
 621        control_lvb_read(ls, &lvb_gen, lvb_bits);
 622
 623        spin_lock(&ls->ls_recover_spin);
 624        if (block_gen != ls->ls_recover_block ||
 625            start_gen != ls->ls_recover_start) {
 626                fs_info(sdp, "recover generation %u block1 %u %u\n",
 627                        start_gen, block_gen, ls->ls_recover_block);
 628                spin_unlock(&ls->ls_recover_spin);
 629                control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 630                return;
 631        }
 632
 633        recover_size = ls->ls_recover_size;
 634
 635        if (lvb_gen <= start_gen) {
 636                /*
 637                 * Clear lvb bits for jids we've successfully recovered.
 638                 * Because all nodes attempt to recover failed journals,
 639                 * a journal can be recovered multiple times successfully
 640                 * in succession.  Only the first will really do recovery,
 641                 * the others find it clean, but still report a successful
 642                 * recovery.  So, another node may have already recovered
 643                 * the jid and cleared the lvb bit for it.
 644                 */
 645                for (i = 0; i < recover_size; i++) {
 646                        if (ls->ls_recover_result[i] != LM_RD_SUCCESS)
 647                                continue;
 648
 649                        ls->ls_recover_result[i] = 0;
 650
 651                        if (!test_bit_le(i, lvb_bits + JID_BITMAP_OFFSET))
 652                                continue;
 653
 654                        __clear_bit_le(i, lvb_bits + JID_BITMAP_OFFSET);
 655                        write_lvb = 1;
 656                }
 657        }
 658
 659        if (lvb_gen == start_gen) {
 660                /*
 661                 * Failed slots before start_gen are already set in lvb.
 662                 */
 663                for (i = 0; i < recover_size; i++) {
 664                        if (!ls->ls_recover_submit[i])
 665                                continue;
 666                        if (ls->ls_recover_submit[i] < lvb_gen)
 667                                ls->ls_recover_submit[i] = 0;
 668                }
 669        } else if (lvb_gen < start_gen) {
 670                /*
 671                 * Failed slots before start_gen are not yet set in lvb.
 672                 */
 673                for (i = 0; i < recover_size; i++) {
 674                        if (!ls->ls_recover_submit[i])
 675                                continue;
 676                        if (ls->ls_recover_submit[i] < start_gen) {
 677                                ls->ls_recover_submit[i] = 0;
 678                                __set_bit_le(i, lvb_bits + JID_BITMAP_OFFSET);
 679                        }
 680                }
 681                /* even if there are no bits to set, we need to write the
 682                   latest generation to the lvb */
 683                write_lvb = 1;
 684        } else {
 685                /*
 686                 * we should be getting a recover_done() for lvb_gen soon
 687                 */
 688        }
 689        spin_unlock(&ls->ls_recover_spin);
 690
 691        if (write_lvb) {
 692                control_lvb_write(ls, start_gen, lvb_bits);
 693                flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK;
 694        } else {
 695                flags = DLM_LKF_CONVERT;
 696        }
 697
 698        error = control_lock(sdp, DLM_LOCK_NL, flags);
 699        if (error) {
 700                fs_err(sdp, "control lock NL error %d\n", error);
 701                return;
 702        }
 703
 704        /*
 705         * Everyone will see jid bits set in the lvb, run gfs2_recover_set(),
 706         * and clear a jid bit in the lvb if the recovery is a success.
 707         * Eventually all journals will be recovered, all jid bits will
 708         * be cleared in the lvb, and everyone will clear BLOCK_LOCKS.
 709         */
 710
 711        for (i = 0; i < recover_size; i++) {
 712                if (test_bit_le(i, lvb_bits + JID_BITMAP_OFFSET)) {
 713                        fs_info(sdp, "recover generation %u jid %d\n",
 714                                start_gen, i);
 715                        gfs2_recover_set(sdp, i);
 716                        recover_set++;
 717                }
 718        }
 719        if (recover_set)
 720                return;
 721
 722        /*
 723         * No more jid bits set in lvb, all recovery is done, unblock locks
 724         * (unless a new recover_prep callback has occured blocking locks
 725         * again while working above)
 726         */
 727
 728        spin_lock(&ls->ls_recover_spin);
 729        if (ls->ls_recover_block == block_gen &&
 730            ls->ls_recover_start == start_gen) {
 731                clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 732                spin_unlock(&ls->ls_recover_spin);
 733                fs_info(sdp, "recover generation %u done\n", start_gen);
 734                gfs2_glock_thaw(sdp);
 735        } else {
 736                fs_info(sdp, "recover generation %u block2 %u %u\n",
 737                        start_gen, block_gen, ls->ls_recover_block);
 738                spin_unlock(&ls->ls_recover_spin);
 739        }
 740}
 741
 742static int control_mount(struct gfs2_sbd *sdp)
 743{
 744        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 745        char lvb_bits[GDLM_LVB_SIZE];
 746        uint32_t start_gen, block_gen, mount_gen, lvb_gen;
 747        int mounted_mode;
 748        int retries = 0;
 749        int error;
 750
 751        memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb));
 752        memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb));
 753        memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE);
 754        ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb;
 755        init_completion(&ls->ls_sync_wait);
 756
 757        set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 758
 759        error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK);
 760        if (error) {
 761                fs_err(sdp, "control_mount control_lock NL error %d\n", error);
 762                return error;
 763        }
 764
 765        error = mounted_lock(sdp, DLM_LOCK_NL, 0);
 766        if (error) {
 767                fs_err(sdp, "control_mount mounted_lock NL error %d\n", error);
 768                control_unlock(sdp);
 769                return error;
 770        }
 771        mounted_mode = DLM_LOCK_NL;
 772
 773restart:
 774        if (retries++ && signal_pending(current)) {
 775                error = -EINTR;
 776                goto fail;
 777        }
 778
 779        /*
 780         * We always start with both locks in NL. control_lock is
 781         * demoted to NL below so we don't need to do it here.
 782         */
 783
 784        if (mounted_mode != DLM_LOCK_NL) {
 785                error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 786                if (error)
 787                        goto fail;
 788                mounted_mode = DLM_LOCK_NL;
 789        }
 790
 791        /*
 792         * Other nodes need to do some work in dlm recovery and gfs2_control
 793         * before the recover_done and control_lock will be ready for us below.
 794         * A delay here is not required but often avoids having to retry.
 795         */
 796
 797        msleep_interruptible(500);
 798
 799        /*
 800         * Acquire control_lock in EX and mounted_lock in either EX or PR.
 801         * control_lock lvb keeps track of any pending journal recoveries.
 802         * mounted_lock indicates if any other nodes have the fs mounted.
 803         */
 804
 805        error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK);
 806        if (error == -EAGAIN) {
 807                goto restart;
 808        } else if (error) {
 809                fs_err(sdp, "control_mount control_lock EX error %d\n", error);
 810                goto fail;
 811        }
 812
 813        error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
 814        if (!error) {
 815                mounted_mode = DLM_LOCK_EX;
 816                goto locks_done;
 817        } else if (error != -EAGAIN) {
 818                fs_err(sdp, "control_mount mounted_lock EX error %d\n", error);
 819                goto fail;
 820        }
 821
 822        error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
 823        if (!error) {
 824                mounted_mode = DLM_LOCK_PR;
 825                goto locks_done;
 826        } else {
 827                /* not even -EAGAIN should happen here */
 828                fs_err(sdp, "control_mount mounted_lock PR error %d\n", error);
 829                goto fail;
 830        }
 831
 832locks_done:
 833        /*
 834         * If we got both locks above in EX, then we're the first mounter.
 835         * If not, then we need to wait for the control_lock lvb to be
 836         * updated by other mounted nodes to reflect our mount generation.
 837         *
 838         * In simple first mounter cases, first mounter will see zero lvb_gen,
 839         * but in cases where all existing nodes leave/fail before mounting
 840         * nodes finish control_mount, then all nodes will be mounting and
 841         * lvb_gen will be non-zero.
 842         */
 843
 844        control_lvb_read(ls, &lvb_gen, lvb_bits);
 845
 846        if (lvb_gen == 0xFFFFFFFF) {
 847                /* special value to force mount attempts to fail */
 848                fs_err(sdp, "control_mount control_lock disabled\n");
 849                error = -EINVAL;
 850                goto fail;
 851        }
 852
 853        if (mounted_mode == DLM_LOCK_EX) {
 854                /* first mounter, keep both EX while doing first recovery */
 855                spin_lock(&ls->ls_recover_spin);
 856                clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 857                set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
 858                set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
 859                spin_unlock(&ls->ls_recover_spin);
 860                fs_info(sdp, "first mounter control generation %u\n", lvb_gen);
 861                return 0;
 862        }
 863
 864        error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 865        if (error)
 866                goto fail;
 867
 868        /*
 869         * We are not first mounter, now we need to wait for the control_lock
 870         * lvb generation to be >= the generation from our first recover_done
 871         * and all lvb bits to be clear (no pending journal recoveries.)
 872         */
 873
 874        if (!all_jid_bits_clear(lvb_bits)) {
 875                /* journals need recovery, wait until all are clear */
 876                fs_info(sdp, "control_mount wait for journal recovery\n");
 877                goto restart;
 878        }
 879
 880        spin_lock(&ls->ls_recover_spin);
 881        block_gen = ls->ls_recover_block;
 882        start_gen = ls->ls_recover_start;
 883        mount_gen = ls->ls_recover_mount;
 884
 885        if (lvb_gen < mount_gen) {
 886                /* wait for mounted nodes to update control_lock lvb to our
 887                   generation, which might include new recovery bits set */
 888                fs_info(sdp, "control_mount wait1 block %u start %u mount %u "
 889                        "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
 890                        lvb_gen, ls->ls_recover_flags);
 891                spin_unlock(&ls->ls_recover_spin);
 892                goto restart;
 893        }
 894
 895        if (lvb_gen != start_gen) {
 896                /* wait for mounted nodes to update control_lock lvb to the
 897                   latest recovery generation */
 898                fs_info(sdp, "control_mount wait2 block %u start %u mount %u "
 899                        "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
 900                        lvb_gen, ls->ls_recover_flags);
 901                spin_unlock(&ls->ls_recover_spin);
 902                goto restart;
 903        }
 904
 905        if (block_gen == start_gen) {
 906                /* dlm recovery in progress, wait for it to finish */
 907                fs_info(sdp, "control_mount wait3 block %u start %u mount %u "
 908                        "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
 909                        lvb_gen, ls->ls_recover_flags);
 910                spin_unlock(&ls->ls_recover_spin);
 911                goto restart;
 912        }
 913
 914        clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 915        set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
 916        memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
 917        memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
 918        spin_unlock(&ls->ls_recover_spin);
 919        return 0;
 920
 921fail:
 922        mounted_unlock(sdp);
 923        control_unlock(sdp);
 924        return error;
 925}
 926
 927static int dlm_recovery_wait(void *word)
 928{
 929        schedule();
 930        return 0;
 931}
 932
 933static int control_first_done(struct gfs2_sbd *sdp)
 934{
 935        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 936        char lvb_bits[GDLM_LVB_SIZE];
 937        uint32_t start_gen, block_gen;
 938        int error;
 939
 940restart:
 941        spin_lock(&ls->ls_recover_spin);
 942        start_gen = ls->ls_recover_start;
 943        block_gen = ls->ls_recover_block;
 944
 945        if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) ||
 946            !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
 947            !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
 948                /* sanity check, should not happen */
 949                fs_err(sdp, "control_first_done start %u block %u flags %lx\n",
 950                       start_gen, block_gen, ls->ls_recover_flags);
 951                spin_unlock(&ls->ls_recover_spin);
 952                control_unlock(sdp);
 953                return -1;
 954        }
 955
 956        if (start_gen == block_gen) {
 957                /*
 958                 * Wait for the end of a dlm recovery cycle to switch from
 959                 * first mounter recovery.  We can ignore any recover_slot
 960                 * callbacks between the recover_prep and next recover_done
 961                 * because we are still the first mounter and any failed nodes
 962                 * have not fully mounted, so they don't need recovery.
 963                 */
 964                spin_unlock(&ls->ls_recover_spin);
 965                fs_info(sdp, "control_first_done wait gen %u\n", start_gen);
 966
 967                wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY,
 968                            dlm_recovery_wait, TASK_UNINTERRUPTIBLE);
 969                goto restart;
 970        }
 971
 972        clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
 973        set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags);
 974        memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
 975        memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
 976        spin_unlock(&ls->ls_recover_spin);
 977
 978        memset(lvb_bits, 0, sizeof(lvb_bits));
 979        control_lvb_write(ls, start_gen, lvb_bits);
 980
 981        error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT);
 982        if (error)
 983                fs_err(sdp, "control_first_done mounted PR error %d\n", error);
 984
 985        error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
 986        if (error)
 987                fs_err(sdp, "control_first_done control NL error %d\n", error);
 988
 989        return error;
 990}
 991
 992/*
 993 * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC)
 994 * to accomodate the largest slot number.  (NB dlm slot numbers start at 1,
 995 * gfs2 jids start at 0, so jid = slot - 1)
 996 */
 997
 998#define RECOVER_SIZE_INC 16
 999
1000static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots,
1001                            int num_slots)
1002{
1003        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1004        uint32_t *submit = NULL;
1005        uint32_t *result = NULL;
1006        uint32_t old_size, new_size;
1007        int i, max_jid;
1008
1009        max_jid = 0;
1010        for (i = 0; i < num_slots; i++) {
1011                if (max_jid < slots[i].slot - 1)
1012                        max_jid = slots[i].slot - 1;
1013        }
1014
1015        old_size = ls->ls_recover_size;
1016
1017        if (old_size >= max_jid + 1)
1018                return 0;
1019
1020        new_size = old_size + RECOVER_SIZE_INC;
1021
1022        submit = kzalloc(new_size * sizeof(uint32_t), GFP_NOFS);
1023        result = kzalloc(new_size * sizeof(uint32_t), GFP_NOFS);
1024        if (!submit || !result) {
1025                kfree(submit);
1026                kfree(result);
1027                return -ENOMEM;
1028        }
1029
1030        spin_lock(&ls->ls_recover_spin);
1031        memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t));
1032        memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t));
1033        kfree(ls->ls_recover_submit);
1034        kfree(ls->ls_recover_result);
1035        ls->ls_recover_submit = submit;
1036        ls->ls_recover_result = result;
1037        ls->ls_recover_size = new_size;
1038        spin_unlock(&ls->ls_recover_spin);
1039        return 0;
1040}
1041
1042static void free_recover_size(struct lm_lockstruct *ls)
1043{
1044        kfree(ls->ls_recover_submit);
1045        kfree(ls->ls_recover_result);
1046        ls->ls_recover_submit = NULL;
1047        ls->ls_recover_result = NULL;
1048        ls->ls_recover_size = 0;
1049}
1050
1051/* dlm calls before it does lock recovery */
1052
1053static void gdlm_recover_prep(void *arg)
1054{
1055        struct gfs2_sbd *sdp = arg;
1056        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1057
1058        spin_lock(&ls->ls_recover_spin);
1059        ls->ls_recover_block = ls->ls_recover_start;
1060        set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1061
1062        if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1063             test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1064                spin_unlock(&ls->ls_recover_spin);
1065                return;
1066        }
1067        set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
1068        spin_unlock(&ls->ls_recover_spin);
1069}
1070
1071/* dlm calls after recover_prep has been completed on all lockspace members;
1072   identifies slot/jid of failed member */
1073
1074static void gdlm_recover_slot(void *arg, struct dlm_slot *slot)
1075{
1076        struct gfs2_sbd *sdp = arg;
1077        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1078        int jid = slot->slot - 1;
1079
1080        spin_lock(&ls->ls_recover_spin);
1081        if (ls->ls_recover_size < jid + 1) {
1082                fs_err(sdp, "recover_slot jid %d gen %u short size %d",
1083                       jid, ls->ls_recover_block, ls->ls_recover_size);
1084                spin_unlock(&ls->ls_recover_spin);
1085                return;
1086        }
1087
1088        if (ls->ls_recover_submit[jid]) {
1089                fs_info(sdp, "recover_slot jid %d gen %u prev %u",
1090                        jid, ls->ls_recover_block, ls->ls_recover_submit[jid]);
1091        }
1092        ls->ls_recover_submit[jid] = ls->ls_recover_block;
1093        spin_unlock(&ls->ls_recover_spin);
1094}
1095
1096/* dlm calls after recover_slot and after it completes lock recovery */
1097
1098static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
1099                              int our_slot, uint32_t generation)
1100{
1101        struct gfs2_sbd *sdp = arg;
1102        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1103
1104        /* ensure the ls jid arrays are large enough */
1105        set_recover_size(sdp, slots, num_slots);
1106
1107        spin_lock(&ls->ls_recover_spin);
1108        ls->ls_recover_start = generation;
1109
1110        if (!ls->ls_recover_mount) {
1111                ls->ls_recover_mount = generation;
1112                ls->ls_jid = our_slot - 1;
1113        }
1114
1115        if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1116                queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
1117
1118        clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1119        smp_mb__after_clear_bit();
1120        wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY);
1121        spin_unlock(&ls->ls_recover_spin);
1122}
1123
1124/* gfs2_recover thread has a journal recovery result */
1125
1126static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid,
1127                                 unsigned int result)
1128{
1129        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1130
1131        if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1132                return;
1133
1134        /* don't care about the recovery of own journal during mount */
1135        if (jid == ls->ls_jid)
1136                return;
1137
1138        spin_lock(&ls->ls_recover_spin);
1139        if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1140                spin_unlock(&ls->ls_recover_spin);
1141                return;
1142        }
1143        if (ls->ls_recover_size < jid + 1) {
1144                fs_err(sdp, "recovery_result jid %d short size %d",
1145                       jid, ls->ls_recover_size);
1146                spin_unlock(&ls->ls_recover_spin);
1147                return;
1148        }
1149
1150        fs_info(sdp, "recover jid %d result %s\n", jid,
1151                result == LM_RD_GAVEUP ? "busy" : "success");
1152
1153        ls->ls_recover_result[jid] = result;
1154
1155        /* GAVEUP means another node is recovering the journal; delay our
1156           next attempt to recover it, to give the other node a chance to
1157           finish before trying again */
1158
1159        if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1160                queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work,
1161                                   result == LM_RD_GAVEUP ? HZ : 0);
1162        spin_unlock(&ls->ls_recover_spin);
1163}
1164
1165const struct dlm_lockspace_ops gdlm_lockspace_ops = {
1166        .recover_prep = gdlm_recover_prep,
1167        .recover_slot = gdlm_recover_slot,
1168        .recover_done = gdlm_recover_done,
1169};
1170
1171static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
1172{
1173        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1174        char cluster[GFS2_LOCKNAME_LEN];
1175        const char *fsname;
1176        uint32_t flags;
1177        int error, ops_result;
1178
1179        /*
1180         * initialize everything
1181         */
1182
1183        INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func);
1184        spin_lock_init(&ls->ls_recover_spin);
1185        ls->ls_recover_flags = 0;
1186        ls->ls_recover_mount = 0;
1187        ls->ls_recover_start = 0;
1188        ls->ls_recover_block = 0;
1189        ls->ls_recover_size = 0;
1190        ls->ls_recover_submit = NULL;
1191        ls->ls_recover_result = NULL;
1192
1193        error = set_recover_size(sdp, NULL, 0);
1194        if (error)
1195                goto fail;
1196
1197        /*
1198         * prepare dlm_new_lockspace args
1199         */
1200
1201        fsname = strchr(table, ':');
1202        if (!fsname) {
1203                fs_info(sdp, "no fsname found\n");
1204                error = -EINVAL;
1205                goto fail_free;
1206        }
1207        memset(cluster, 0, sizeof(cluster));
1208        memcpy(cluster, table, strlen(table) - strlen(fsname));
1209        fsname++;
1210
1211        flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
1212
1213        /*
1214         * create/join lockspace
1215         */
1216
1217        error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE,
1218                                  &gdlm_lockspace_ops, sdp, &ops_result,
1219                                  &ls->ls_dlm);
1220        if (error) {
1221                fs_err(sdp, "dlm_new_lockspace error %d\n", error);
1222                goto fail_free;
1223        }
1224
1225        if (ops_result < 0) {
1226                /*
1227                 * dlm does not support ops callbacks,
1228                 * old dlm_controld/gfs_controld are used, try without ops.
1229                 */
1230                fs_info(sdp, "dlm lockspace ops not used\n");
1231                free_recover_size(ls);
1232                set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags);
1233                return 0;
1234        }
1235
1236        if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) {
1237                fs_err(sdp, "dlm lockspace ops disallow jid preset\n");
1238                error = -EINVAL;
1239                goto fail_release;
1240        }
1241
1242        /*
1243         * control_mount() uses control_lock to determine first mounter,
1244         * and for later mounts, waits for any recoveries to be cleared.
1245         */
1246
1247        error = control_mount(sdp);
1248        if (error) {
1249                fs_err(sdp, "mount control error %d\n", error);
1250                goto fail_release;
1251        }
1252
1253        ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1254        clear_bit(SDF_NOJOURNALID, &sdp->sd_flags);
1255        smp_mb__after_clear_bit();
1256        wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID);
1257        return 0;
1258
1259fail_release:
1260        dlm_release_lockspace(ls->ls_dlm, 2);
1261fail_free:
1262        free_recover_size(ls);
1263fail:
1264        return error;
1265}
1266
1267static void gdlm_first_done(struct gfs2_sbd *sdp)
1268{
1269        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1270        int error;
1271
1272        if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1273                return;
1274
1275        error = control_first_done(sdp);
1276        if (error)
1277                fs_err(sdp, "mount first_done error %d\n", error);
1278}
1279
1280static void gdlm_unmount(struct gfs2_sbd *sdp)
1281{
1282        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1283
1284        if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1285                goto release;
1286
1287        /* wait for gfs2_control_wq to be done with this mount */
1288
1289        spin_lock(&ls->ls_recover_spin);
1290        set_bit(DFL_UNMOUNT, &ls->ls_recover_flags);
1291        spin_unlock(&ls->ls_recover_spin);
1292        flush_delayed_work_sync(&sdp->sd_control_work);
1293
1294        /* mounted_lock and control_lock will be purged in dlm recovery */
1295release:
1296        if (ls->ls_dlm) {
1297                dlm_release_lockspace(ls->ls_dlm, 2);
1298                ls->ls_dlm = NULL;
1299        }
1300
1301        free_recover_size(ls);
1302}
1303
1304static const match_table_t dlm_tokens = {
1305        { Opt_jid, "jid=%d"},
1306        { Opt_id, "id=%d"},
1307        { Opt_first, "first=%d"},
1308        { Opt_nodir, "nodir=%d"},
1309        { Opt_err, NULL },
1310};
1311
1312const struct lm_lockops gfs2_dlm_ops = {
1313        .lm_proto_name = "lock_dlm",
1314        .lm_mount = gdlm_mount,
1315        .lm_first_done = gdlm_first_done,
1316        .lm_recovery_result = gdlm_recovery_result,
1317        .lm_unmount = gdlm_unmount,
1318        .lm_put_lock = gdlm_put_lock,
1319        .lm_lock = gdlm_lock,
1320        .lm_cancel = gdlm_cancel,
1321        .lm_tokens = &dlm_tokens,
1322};
1323
1324
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.