linux/drivers/md/persistent-data/dm-block-manager.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2011 Red Hat, Inc.
   3 *
   4 * This file is released under the GPL.
   5 */
   6#include "dm-block-manager.h"
   7#include "dm-persistent-data-internal.h"
   8#include "../dm-bufio.h"
   9
  10#include <linux/crc32c.h>
  11#include <linux/module.h>
  12#include <linux/slab.h>
  13#include <linux/rwsem.h>
  14#include <linux/device-mapper.h>
  15#include <linux/stacktrace.h>
  16
  17#define DM_MSG_PREFIX "block manager"
  18
  19/*----------------------------------------------------------------*/
  20
  21/*
  22 * This is a read/write semaphore with a couple of differences.
  23 *
  24 * i) There is a restriction on the number of concurrent read locks that
  25 * may be held at once.  This is just an implementation detail.
  26 *
  27 * ii) Recursive locking attempts are detected and return EINVAL.  A stack
  28 * trace is also emitted for the previous lock aquisition.
  29 *
  30 * iii) Priority is given to write locks.
  31 */
  32#define MAX_HOLDERS 4
  33#define MAX_STACK 10
  34
  35typedef unsigned long stack_entries[MAX_STACK];
  36
  37struct block_lock {
  38        spinlock_t lock;
  39        __s32 count;
  40        struct list_head waiters;
  41        struct task_struct *holders[MAX_HOLDERS];
  42
  43#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  44        struct stack_trace traces[MAX_HOLDERS];
  45        stack_entries entries[MAX_HOLDERS];
  46#endif
  47};
  48
  49struct waiter {
  50        struct list_head list;
  51        struct task_struct *task;
  52        int wants_write;
  53};
  54
  55static unsigned __find_holder(struct block_lock *lock,
  56                              struct task_struct *task)
  57{
  58        unsigned i;
  59
  60        for (i = 0; i < MAX_HOLDERS; i++)
  61                if (lock->holders[i] == task)
  62                        break;
  63
  64        BUG_ON(i == MAX_HOLDERS);
  65        return i;
  66}
  67
  68/* call this *after* you increment lock->count */
  69static void __add_holder(struct block_lock *lock, struct task_struct *task)
  70{
  71        unsigned h = __find_holder(lock, NULL);
  72#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  73        struct stack_trace *t;
  74#endif
  75
  76        get_task_struct(task);
  77        lock->holders[h] = task;
  78
  79#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
  80        t = lock->traces + h;
  81        t->nr_entries = 0;
  82        t->max_entries = MAX_STACK;
  83        t->entries = lock->entries[h];
  84        t->skip = 2;
  85        save_stack_trace(t);
  86#endif
  87}
  88
  89/* call this *before* you decrement lock->count */
  90static void __del_holder(struct block_lock *lock, struct task_struct *task)
  91{
  92        unsigned h = __find_holder(lock, task);
  93        lock->holders[h] = NULL;
  94        put_task_struct(task);
  95}
  96
  97static int __check_holder(struct block_lock *lock)
  98{
  99        unsigned i;
 100#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
 101        static struct stack_trace t;
 102        static stack_entries entries;
 103#endif
 104
 105        for (i = 0; i < MAX_HOLDERS; i++) {
 106                if (lock->holders[i] == current) {
 107                        DMERR("recursive lock detected in pool metadata");
 108#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
 109                        DMERR("previously held here:");
 110                        print_stack_trace(lock->traces + i, 4);
 111
 112                        DMERR("subsequent aquisition attempted here:");
 113                        t.nr_entries = 0;
 114                        t.max_entries = MAX_STACK;
 115                        t.entries = entries;
 116                        t.skip = 3;
 117                        save_stack_trace(&t);
 118                        print_stack_trace(&t, 4);
 119#endif
 120                        return -EINVAL;
 121                }
 122        }
 123
 124        return 0;
 125}
 126
 127static void __wait(struct waiter *w)
 128{
 129        for (;;) {
 130                set_task_state(current, TASK_UNINTERRUPTIBLE);
 131
 132                if (!w->task)
 133                        break;
 134
 135                schedule();
 136        }
 137
 138        set_task_state(current, TASK_RUNNING);
 139}
 140
 141static void __wake_waiter(struct waiter *w)
 142{
 143        struct task_struct *task;
 144
 145        list_del(&w->list);
 146        task = w->task;
 147        smp_mb();
 148        w->task = NULL;
 149        wake_up_process(task);
 150}
 151
 152/*
 153 * We either wake a few readers or a single writer.
 154 */
 155static void __wake_many(struct block_lock *lock)
 156{
 157        struct waiter *w, *tmp;
 158
 159        BUG_ON(lock->count < 0);
 160        list_for_each_entry_safe(w, tmp, &lock->waiters, list) {
 161                if (lock->count >= MAX_HOLDERS)
 162                        return;
 163
 164                if (w->wants_write) {
 165                        if (lock->count > 0)
 166                                return; /* still read locked */
 167
 168                        lock->count = -1;
 169                        __add_holder(lock, w->task);
 170                        __wake_waiter(w);
 171                        return;
 172                }
 173
 174                lock->count++;
 175                __add_holder(lock, w->task);
 176                __wake_waiter(w);
 177        }
 178}
 179
 180static void bl_init(struct block_lock *lock)
 181{
 182        int i;
 183
 184        spin_lock_init(&lock->lock);
 185        lock->count = 0;
 186        INIT_LIST_HEAD(&lock->waiters);
 187        for (i = 0; i < MAX_HOLDERS; i++)
 188                lock->holders[i] = NULL;
 189}
 190
 191static int __available_for_read(struct block_lock *lock)
 192{
 193        return lock->count >= 0 &&
 194                lock->count < MAX_HOLDERS &&
 195                list_empty(&lock->waiters);
 196}
 197
 198static int bl_down_read(struct block_lock *lock)
 199{
 200        int r;
 201        struct waiter w;
 202
 203        spin_lock(&lock->lock);
 204        r = __check_holder(lock);
 205        if (r) {
 206                spin_unlock(&lock->lock);
 207                return r;
 208        }
 209
 210        if (__available_for_read(lock)) {
 211                lock->count++;
 212                __add_holder(lock, current);
 213                spin_unlock(&lock->lock);
 214                return 0;
 215        }
 216
 217        get_task_struct(current);
 218
 219        w.task = current;
 220        w.wants_write = 0;
 221        list_add_tail(&w.list, &lock->waiters);
 222        spin_unlock(&lock->lock);
 223
 224        __wait(&w);
 225        put_task_struct(current);
 226        return 0;
 227}
 228
 229static int bl_down_read_nonblock(struct block_lock *lock)
 230{
 231        int r;
 232
 233        spin_lock(&lock->lock);
 234        r = __check_holder(lock);
 235        if (r)
 236                goto out;
 237
 238        if (__available_for_read(lock)) {
 239                lock->count++;
 240                __add_holder(lock, current);
 241                r = 0;
 242        } else
 243                r = -EWOULDBLOCK;
 244
 245out:
 246        spin_unlock(&lock->lock);
 247        return r;
 248}
 249
 250static void bl_up_read(struct block_lock *lock)
 251{
 252        spin_lock(&lock->lock);
 253        BUG_ON(lock->count <= 0);
 254        __del_holder(lock, current);
 255        --lock->count;
 256        if (!list_empty(&lock->waiters))
 257                __wake_many(lock);
 258        spin_unlock(&lock->lock);
 259}
 260
 261static int bl_down_write(struct block_lock *lock)
 262{
 263        int r;
 264        struct waiter w;
 265
 266        spin_lock(&lock->lock);
 267        r = __check_holder(lock);
 268        if (r) {
 269                spin_unlock(&lock->lock);
 270                return r;
 271        }
 272
 273        if (lock->count == 0 && list_empty(&lock->waiters)) {
 274                lock->count = -1;
 275                __add_holder(lock, current);
 276                spin_unlock(&lock->lock);
 277                return 0;
 278        }
 279
 280        get_task_struct(current);
 281        w.task = current;
 282        w.wants_write = 1;
 283
 284        /*
 285         * Writers given priority. We know there's only one mutator in the
 286         * system, so ignoring the ordering reversal.
 287         */
 288        list_add(&w.list, &lock->waiters);
 289        spin_unlock(&lock->lock);
 290
 291        __wait(&w);
 292        put_task_struct(current);
 293
 294        return 0;
 295}
 296
 297static void bl_up_write(struct block_lock *lock)
 298{
 299        spin_lock(&lock->lock);
 300        __del_holder(lock, current);
 301        lock->count = 0;
 302        if (!list_empty(&lock->waiters))
 303                __wake_many(lock);
 304        spin_unlock(&lock->lock);
 305}
 306
 307static void report_recursive_bug(dm_block_t b, int r)
 308{
 309        if (r == -EINVAL)
 310                DMERR("recursive acquisition of block %llu requested.",
 311                      (unsigned long long) b);
 312}
 313
 314/*----------------------------------------------------------------*/
 315
 316/*
 317 * Block manager is currently implemented using dm-bufio.  struct
 318 * dm_block_manager and struct dm_block map directly onto a couple of
 319 * structs in the bufio interface.  I want to retain the freedom to move
 320 * away from bufio in the future.  So these structs are just cast within
 321 * this .c file, rather than making it through to the public interface.
 322 */
 323static struct dm_buffer *to_buffer(struct dm_block *b)
 324{
 325        return (struct dm_buffer *) b;
 326}
 327
 328static struct dm_bufio_client *to_bufio(struct dm_block_manager *bm)
 329{
 330        return (struct dm_bufio_client *) bm;
 331}
 332
 333dm_block_t dm_block_location(struct dm_block *b)
 334{
 335        return dm_bufio_get_block_number(to_buffer(b));
 336}
 337EXPORT_SYMBOL_GPL(dm_block_location);
 338
 339void *dm_block_data(struct dm_block *b)
 340{
 341        return dm_bufio_get_block_data(to_buffer(b));
 342}
 343EXPORT_SYMBOL_GPL(dm_block_data);
 344
 345struct buffer_aux {
 346        struct dm_block_validator *validator;
 347        struct block_lock lock;
 348        int write_locked;
 349};
 350
 351static void dm_block_manager_alloc_callback(struct dm_buffer *buf)
 352{
 353        struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
 354        aux->validator = NULL;
 355        bl_init(&aux->lock);
 356}
 357
 358static void dm_block_manager_write_callback(struct dm_buffer *buf)
 359{
 360        struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
 361        if (aux->validator) {
 362                aux->validator->prepare_for_write(aux->validator, (struct dm_block *) buf,
 363                         dm_bufio_get_block_size(dm_bufio_get_client(buf)));
 364        }
 365}
 366
 367/*----------------------------------------------------------------
 368 * Public interface
 369 *--------------------------------------------------------------*/
 370struct dm_block_manager *dm_block_manager_create(struct block_device *bdev,
 371                                                 unsigned block_size,
 372                                                 unsigned cache_size,
 373                                                 unsigned max_held_per_thread)
 374{
 375        return (struct dm_block_manager *)
 376                dm_bufio_client_create(bdev, block_size, max_held_per_thread,
 377                                       sizeof(struct buffer_aux),
 378                                       dm_block_manager_alloc_callback,
 379                                       dm_block_manager_write_callback);
 380}
 381EXPORT_SYMBOL_GPL(dm_block_manager_create);
 382
 383void dm_block_manager_destroy(struct dm_block_manager *bm)
 384{
 385        return dm_bufio_client_destroy(to_bufio(bm));
 386}
 387EXPORT_SYMBOL_GPL(dm_block_manager_destroy);
 388
 389unsigned dm_bm_block_size(struct dm_block_manager *bm)
 390{
 391        return dm_bufio_get_block_size(to_bufio(bm));
 392}
 393EXPORT_SYMBOL_GPL(dm_bm_block_size);
 394
 395dm_block_t dm_bm_nr_blocks(struct dm_block_manager *bm)
 396{
 397        return dm_bufio_get_device_size(to_bufio(bm));
 398}
 399
 400static int dm_bm_validate_buffer(struct dm_block_manager *bm,
 401                                 struct dm_buffer *buf,
 402                                 struct buffer_aux *aux,
 403                                 struct dm_block_validator *v)
 404{
 405        if (unlikely(!aux->validator)) {
 406                int r;
 407                if (!v)
 408                        return 0;
 409                r = v->check(v, (struct dm_block *) buf, dm_bufio_get_block_size(to_bufio(bm)));
 410                if (unlikely(r))
 411                        return r;
 412                aux->validator = v;
 413        } else {
 414                if (unlikely(aux->validator != v)) {
 415                        DMERR("validator mismatch (old=%s vs new=%s) for block %llu",
 416                                aux->validator->name, v ? v->name : "NULL",
 417                                (unsigned long long)
 418                                        dm_bufio_get_block_number(buf));
 419                        return -EINVAL;
 420                }
 421        }
 422
 423        return 0;
 424}
 425int dm_bm_read_lock(struct dm_block_manager *bm, dm_block_t b,
 426                    struct dm_block_validator *v,
 427                    struct dm_block **result)
 428{
 429        struct buffer_aux *aux;
 430        void *p;
 431        int r;
 432
 433        p = dm_bufio_read(to_bufio(bm), b, (struct dm_buffer **) result);
 434        if (unlikely(IS_ERR(p)))
 435                return PTR_ERR(p);
 436
 437        aux = dm_bufio_get_aux_data(to_buffer(*result));
 438        r = bl_down_read(&aux->lock);
 439        if (unlikely(r)) {
 440                dm_bufio_release(to_buffer(*result));
 441                report_recursive_bug(b, r);
 442                return r;
 443        }
 444
 445        aux->write_locked = 0;
 446
 447        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 448        if (unlikely(r)) {
 449                bl_up_read(&aux->lock);
 450                dm_bufio_release(to_buffer(*result));
 451                return r;
 452        }
 453
 454        return 0;
 455}
 456EXPORT_SYMBOL_GPL(dm_bm_read_lock);
 457
 458int dm_bm_write_lock(struct dm_block_manager *bm,
 459                     dm_block_t b, struct dm_block_validator *v,
 460                     struct dm_block **result)
 461{
 462        struct buffer_aux *aux;
 463        void *p;
 464        int r;
 465
 466        p = dm_bufio_read(to_bufio(bm), b, (struct dm_buffer **) result);
 467        if (unlikely(IS_ERR(p)))
 468                return PTR_ERR(p);
 469
 470        aux = dm_bufio_get_aux_data(to_buffer(*result));
 471        r = bl_down_write(&aux->lock);
 472        if (r) {
 473                dm_bufio_release(to_buffer(*result));
 474                report_recursive_bug(b, r);
 475                return r;
 476        }
 477
 478        aux->write_locked = 1;
 479
 480        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 481        if (unlikely(r)) {
 482                bl_up_write(&aux->lock);
 483                dm_bufio_release(to_buffer(*result));
 484                return r;
 485        }
 486
 487        return 0;
 488}
 489EXPORT_SYMBOL_GPL(dm_bm_write_lock);
 490
 491int dm_bm_read_try_lock(struct dm_block_manager *bm,
 492                        dm_block_t b, struct dm_block_validator *v,
 493                        struct dm_block **result)
 494{
 495        struct buffer_aux *aux;
 496        void *p;
 497        int r;
 498
 499        p = dm_bufio_get(to_bufio(bm), b, (struct dm_buffer **) result);
 500        if (unlikely(IS_ERR(p)))
 501                return PTR_ERR(p);
 502        if (unlikely(!p))
 503                return -EWOULDBLOCK;
 504
 505        aux = dm_bufio_get_aux_data(to_buffer(*result));
 506        r = bl_down_read_nonblock(&aux->lock);
 507        if (r < 0) {
 508                dm_bufio_release(to_buffer(*result));
 509                report_recursive_bug(b, r);
 510                return r;
 511        }
 512        aux->write_locked = 0;
 513
 514        r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
 515        if (unlikely(r)) {
 516                bl_up_read(&aux->lock);
 517                dm_bufio_release(to_buffer(*result));
 518                return r;
 519        }
 520
 521        return 0;
 522}
 523
 524int dm_bm_write_lock_zero(struct dm_block_manager *bm,
 525                          dm_block_t b, struct dm_block_validator *v,
 526                          struct dm_block **result)
 527{
 528        int r;
 529        struct buffer_aux *aux;
 530        void *p;
 531
 532        p = dm_bufio_new(to_bufio(bm), b, (struct dm_buffer **) result);
 533        if (unlikely(IS_ERR(p)))
 534                return PTR_ERR(p);
 535
 536        memset(p, 0, dm_bm_block_size(bm));
 537
 538        aux = dm_bufio_get_aux_data(to_buffer(*result));
 539        r = bl_down_write(&aux->lock);
 540        if (r) {
 541                dm_bufio_release(to_buffer(*result));
 542                return r;
 543        }
 544
 545        aux->write_locked = 1;
 546        aux->validator = v;
 547
 548        return 0;
 549}
 550
 551int dm_bm_unlock(struct dm_block *b)
 552{
 553        struct buffer_aux *aux;
 554        aux = dm_bufio_get_aux_data(to_buffer(b));
 555
 556        if (aux->write_locked) {
 557                dm_bufio_mark_buffer_dirty(to_buffer(b));
 558                bl_up_write(&aux->lock);
 559        } else
 560                bl_up_read(&aux->lock);
 561
 562        dm_bufio_release(to_buffer(b));
 563
 564        return 0;
 565}
 566EXPORT_SYMBOL_GPL(dm_bm_unlock);
 567
 568int dm_bm_unlock_move(struct dm_block *b, dm_block_t n)
 569{
 570        struct buffer_aux *aux;
 571
 572        aux = dm_bufio_get_aux_data(to_buffer(b));
 573
 574        if (aux->write_locked) {
 575                dm_bufio_mark_buffer_dirty(to_buffer(b));
 576                bl_up_write(&aux->lock);
 577        } else
 578                bl_up_read(&aux->lock);
 579
 580        dm_bufio_release_move(to_buffer(b), n);
 581        return 0;
 582}
 583
 584int dm_bm_flush_and_unlock(struct dm_block_manager *bm,
 585                           struct dm_block *superblock)
 586{
 587        int r;
 588
 589        r = dm_bufio_write_dirty_buffers(to_bufio(bm));
 590        if (unlikely(r))
 591                return r;
 592        r = dm_bufio_issue_flush(to_bufio(bm));
 593        if (unlikely(r))
 594                return r;
 595
 596        dm_bm_unlock(superblock);
 597
 598        r = dm_bufio_write_dirty_buffers(to_bufio(bm));
 599        if (unlikely(r))
 600                return r;
 601        r = dm_bufio_issue_flush(to_bufio(bm));
 602        if (unlikely(r))
 603                return r;
 604
 605        return 0;
 606}
 607
 608u32 dm_bm_checksum(const void *data, size_t len, u32 init_xor)
 609{
 610        return crc32c(~(u32) 0, data, len) ^ init_xor;
 611}
 612EXPORT_SYMBOL_GPL(dm_bm_checksum);
 613
 614/*----------------------------------------------------------------*/
 615
 616MODULE_LICENSE("GPL");
 617MODULE_AUTHOR("Joe Thornber <dm-devel@redhat.com>");
 618MODULE_DESCRIPTION("Immutable metadata library for dm");
 619
 620/*----------------------------------------------------------------*/
 621