linux/fs/dlm/ast.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2010 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lock.h"
  16#include "user.h"
  17
  18static uint64_t                 dlm_cb_seq;
  19static spinlock_t               dlm_cb_seq_spin;
  20
  21static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
  22{
  23        int i;
  24
  25        log_print("last_bast %x %llu flags %x mode %d sb %d %x",
  26                  lkb->lkb_id,
  27                  (unsigned long long)lkb->lkb_last_bast.seq,
  28                  lkb->lkb_last_bast.flags,
  29                  lkb->lkb_last_bast.mode,
  30                  lkb->lkb_last_bast.sb_status,
  31                  lkb->lkb_last_bast.sb_flags);
  32
  33        log_print("last_cast %x %llu flags %x mode %d sb %d %x",
  34                  lkb->lkb_id,
  35                  (unsigned long long)lkb->lkb_last_cast.seq,
  36                  lkb->lkb_last_cast.flags,
  37                  lkb->lkb_last_cast.mode,
  38                  lkb->lkb_last_cast.sb_status,
  39                  lkb->lkb_last_cast.sb_flags);
  40
  41        for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
  42                log_print("cb %x %llu flags %x mode %d sb %d %x",
  43                          lkb->lkb_id,
  44                          (unsigned long long)lkb->lkb_callbacks[i].seq,
  45                          lkb->lkb_callbacks[i].flags,
  46                          lkb->lkb_callbacks[i].mode,
  47                          lkb->lkb_callbacks[i].sb_status,
  48                          lkb->lkb_callbacks[i].sb_flags);
  49        }
  50}
  51
  52int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
  53                         int status, uint32_t sbflags, uint64_t seq)
  54{
  55        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
  56        uint64_t prev_seq;
  57        int prev_mode;
  58        int i, rv;
  59
  60        for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
  61                if (lkb->lkb_callbacks[i].seq)
  62                        continue;
  63
  64                /*
  65                 * Suppress some redundant basts here, do more on removal.
  66                 * Don't even add a bast if the callback just before it
  67                 * is a bast for the same mode or a more restrictive mode.
  68                 * (the addional > PR check is needed for PR/CW inversion)
  69                 */
  70
  71                if ((i > 0) && (flags & DLM_CB_BAST) &&
  72                    (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) {
  73
  74                        prev_seq = lkb->lkb_callbacks[i-1].seq;
  75                        prev_mode = lkb->lkb_callbacks[i-1].mode;
  76
  77                        if ((prev_mode == mode) ||
  78                            (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
  79
  80                                log_debug(ls, "skip %x add bast %llu mode %d "
  81                                          "for bast %llu mode %d",
  82                                          lkb->lkb_id,
  83                                          (unsigned long long)seq,
  84                                          mode,
  85                                          (unsigned long long)prev_seq,
  86                                          prev_mode);
  87                                rv = 0;
  88                                goto out;
  89                        }
  90                }
  91
  92                lkb->lkb_callbacks[i].seq = seq;
  93                lkb->lkb_callbacks[i].flags = flags;
  94                lkb->lkb_callbacks[i].mode = mode;
  95                lkb->lkb_callbacks[i].sb_status = status;
  96                lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
  97                rv = 0;
  98                break;
  99        }
 100
 101        if (i == DLM_CALLBACKS_SIZE) {
 102                log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x",
 103                          lkb->lkb_id, (unsigned long long)seq,
 104                          flags, mode, status, sbflags);
 105                dlm_dump_lkb_callbacks(lkb);
 106                rv = -1;
 107                goto out;
 108        }
 109 out:
 110        return rv;
 111}
 112
 113int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
 114                         struct dlm_callback *cb, int *resid)
 115{
 116        int i, rv;
 117
 118        *resid = 0;
 119
 120        if (!lkb->lkb_callbacks[0].seq) {
 121                rv = -ENOENT;
 122                goto out;
 123        }
 124
 125        /* oldest undelivered cb is callbacks[0] */
 126
 127        memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback));
 128        memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback));
 129
 130        /* shift others down */
 131
 132        for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
 133                if (!lkb->lkb_callbacks[i].seq)
 134                        break;
 135                memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
 136                       sizeof(struct dlm_callback));
 137                memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
 138                (*resid)++;
 139        }
 140
 141        /* if cb is a bast, it should be skipped if the blocking mode is
 142           compatible with the last granted mode */
 143
 144        if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
 145                if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
 146                        cb->flags |= DLM_CB_SKIP;
 147
 148                        log_debug(ls, "skip %x bast %llu mode %d "
 149                                  "for cast %llu mode %d",
 150                                  lkb->lkb_id,
 151                                  (unsigned long long)cb->seq,
 152                                  cb->mode,
 153                                  (unsigned long long)lkb->lkb_last_cast.seq,
 154                                  lkb->lkb_last_cast.mode);
 155                        rv = 0;
 156                        goto out;
 157                }
 158        }
 159
 160        if (cb->flags & DLM_CB_CAST) {
 161                memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
 162                lkb->lkb_last_cast_time = ktime_get();
 163        }
 164
 165        if (cb->flags & DLM_CB_BAST) {
 166                memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
 167                lkb->lkb_last_bast_time = ktime_get();
 168        }
 169        rv = 0;
 170 out:
 171        return rv;
 172}
 173
 174void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
 175                uint32_t sbflags)
 176{
 177        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 178        uint64_t new_seq, prev_seq;
 179        int rv;
 180
 181        spin_lock(&dlm_cb_seq_spin);
 182        new_seq = ++dlm_cb_seq;
 183        spin_unlock(&dlm_cb_seq_spin);
 184
 185        if (lkb->lkb_flags & DLM_IFL_USER) {
 186                dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
 187                return;
 188        }
 189
 190        mutex_lock(&lkb->lkb_cb_mutex);
 191        prev_seq = lkb->lkb_callbacks[0].seq;
 192
 193        rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
 194        if (rv < 0)
 195                goto out;
 196
 197        if (!prev_seq) {
 198                kref_get(&lkb->lkb_ref);
 199
 200                if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
 201                        mutex_lock(&ls->ls_cb_mutex);
 202                        list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
 203                        mutex_unlock(&ls->ls_cb_mutex);
 204                } else {
 205                        queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
 206                }
 207        }
 208 out:
 209        mutex_unlock(&lkb->lkb_cb_mutex);
 210}
 211
 212void dlm_callback_work(struct work_struct *work)
 213{
 214        struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
 215        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 216        void (*castfn) (void *astparam);
 217        void (*bastfn) (void *astparam, int mode);
 218        struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
 219        int i, rv, resid;
 220
 221        memset(&callbacks, 0, sizeof(callbacks));
 222
 223        mutex_lock(&lkb->lkb_cb_mutex);
 224        if (!lkb->lkb_callbacks[0].seq) {
 225                /* no callback work exists, shouldn't happen */
 226                log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
 227                dlm_print_lkb(lkb);
 228                dlm_dump_lkb_callbacks(lkb);
 229        }
 230
 231        for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
 232                rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
 233                if (rv < 0)
 234                        break;
 235        }
 236
 237        if (resid) {
 238                /* cbs remain, loop should have removed all, shouldn't happen */
 239                log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
 240                          resid);
 241                dlm_print_lkb(lkb);
 242                dlm_dump_lkb_callbacks(lkb);
 243        }
 244        mutex_unlock(&lkb->lkb_cb_mutex);
 245
 246        castfn = lkb->lkb_astfn;
 247        bastfn = lkb->lkb_bastfn;
 248
 249        for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
 250                if (!callbacks[i].seq)
 251                        break;
 252                if (callbacks[i].flags & DLM_CB_SKIP) {
 253                        continue;
 254                } else if (callbacks[i].flags & DLM_CB_BAST) {
 255                        bastfn(lkb->lkb_astparam, callbacks[i].mode);
 256                } else if (callbacks[i].flags & DLM_CB_CAST) {
 257                        lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
 258                        lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
 259                        castfn(lkb->lkb_astparam);
 260                }
 261        }
 262
 263        /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
 264        dlm_put_lkb(lkb);
 265}
 266
 267int dlm_callback_start(struct dlm_ls *ls)
 268{
 269        ls->ls_callback_wq = alloc_workqueue("dlm_callback",
 270                                             WQ_UNBOUND |
 271                                             WQ_MEM_RECLAIM |
 272                                             WQ_NON_REENTRANT,
 273                                             0);
 274        if (!ls->ls_callback_wq) {
 275                log_print("can't start dlm_callback workqueue");
 276                return -ENOMEM;
 277        }
 278        return 0;
 279}
 280
 281void dlm_callback_stop(struct dlm_ls *ls)
 282{
 283        if (ls->ls_callback_wq)
 284                destroy_workqueue(ls->ls_callback_wq);
 285}
 286
 287void dlm_callback_suspend(struct dlm_ls *ls)
 288{
 289        set_bit(LSFL_CB_DELAY, &ls->ls_flags);
 290
 291        if (ls->ls_callback_wq)
 292                flush_workqueue(ls->ls_callback_wq);
 293}
 294
 295void dlm_callback_resume(struct dlm_ls *ls)
 296{
 297        struct dlm_lkb *lkb, *safe;
 298        int count = 0;
 299
 300        clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
 301
 302        if (!ls->ls_callback_wq)
 303                return;
 304
 305        mutex_lock(&ls->ls_cb_mutex);
 306        list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
 307                list_del_init(&lkb->lkb_cb_list);
 308                queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
 309                count++;
 310        }
 311        mutex_unlock(&ls->ls_cb_mutex);
 312
 313        if (count)
 314                log_debug(ls, "dlm_callback_resume %d", count);
 315}
 316
 317
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.