linux/drivers/md/dm-raid1.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2003 Sistina Software Limited.
   3 * Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
   4 *
   5 * This file is released under the GPL.
   6 */
   7
   8#include "dm-bio-record.h"
   9
  10#include <linux/init.h>
  11#include <linux/mempool.h>
  12#include <linux/module.h>
  13#include <linux/pagemap.h>
  14#include <linux/slab.h>
  15#include <linux/workqueue.h>
  16#include <linux/device-mapper.h>
  17#include <linux/dm-io.h>
  18#include <linux/dm-dirty-log.h>
  19#include <linux/dm-kcopyd.h>
  20#include <linux/dm-region-hash.h>
  21
  22#define DM_MSG_PREFIX "raid1"
  23
  24#define MAX_RECOVERY 1  /* Maximum number of regions recovered in parallel. */
  25
  26#define DM_RAID1_HANDLE_ERRORS 0x01
  27#define errors_handled(p)       ((p)->features & DM_RAID1_HANDLE_ERRORS)
  28
  29static DECLARE_WAIT_QUEUE_HEAD(_kmirrord_recovery_stopped);
  30
  31/*-----------------------------------------------------------------
  32 * Mirror set structures.
  33 *---------------------------------------------------------------*/
  34enum dm_raid1_error {
  35        DM_RAID1_WRITE_ERROR,
  36        DM_RAID1_FLUSH_ERROR,
  37        DM_RAID1_SYNC_ERROR,
  38        DM_RAID1_READ_ERROR
  39};
  40
  41struct mirror {
  42        struct mirror_set *ms;
  43        atomic_t error_count;
  44        unsigned long error_type;
  45        struct dm_dev *dev;
  46        sector_t offset;
  47};
  48
  49struct mirror_set {
  50        struct dm_target *ti;
  51        struct list_head list;
  52
  53        uint64_t features;
  54
  55        spinlock_t lock;        /* protects the lists */
  56        struct bio_list reads;
  57        struct bio_list writes;
  58        struct bio_list failures;
  59        struct bio_list holds;  /* bios are waiting until suspend */
  60
  61        struct dm_region_hash *rh;
  62        struct dm_kcopyd_client *kcopyd_client;
  63        struct dm_io_client *io_client;
  64
  65        /* recovery */
  66        region_t nr_regions;
  67        int in_sync;
  68        int log_failure;
  69        int leg_failure;
  70        atomic_t suspend;
  71
  72        atomic_t default_mirror;        /* Default mirror */
  73
  74        struct workqueue_struct *kmirrord_wq;
  75        struct work_struct kmirrord_work;
  76        struct timer_list timer;
  77        unsigned long timer_pending;
  78
  79        struct work_struct trigger_event;
  80
  81        unsigned nr_mirrors;
  82        struct mirror mirror[0];
  83};
  84
  85static void wakeup_mirrord(void *context)
  86{
  87        struct mirror_set *ms = context;
  88
  89        queue_work(ms->kmirrord_wq, &ms->kmirrord_work);
  90}
  91
  92static void delayed_wake_fn(unsigned long data)
  93{
  94        struct mirror_set *ms = (struct mirror_set *) data;
  95
  96        clear_bit(0, &ms->timer_pending);
  97        wakeup_mirrord(ms);
  98}
  99
 100static void delayed_wake(struct mirror_set *ms)
 101{
 102        if (test_and_set_bit(0, &ms->timer_pending))
 103                return;
 104
 105        ms->timer.expires = jiffies + HZ / 5;
 106        ms->timer.data = (unsigned long) ms;
 107        ms->timer.function = delayed_wake_fn;
 108        add_timer(&ms->timer);
 109}
 110
 111static void wakeup_all_recovery_waiters(void *context)
 112{
 113        wake_up_all(&_kmirrord_recovery_stopped);
 114}
 115
 116static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
 117{
 118        unsigned long flags;
 119        int should_wake = 0;
 120        struct bio_list *bl;
 121
 122        bl = (rw == WRITE) ? &ms->writes : &ms->reads;
 123        spin_lock_irqsave(&ms->lock, flags);
 124        should_wake = !(bl->head);
 125        bio_list_add(bl, bio);
 126        spin_unlock_irqrestore(&ms->lock, flags);
 127
 128        if (should_wake)
 129                wakeup_mirrord(ms);
 130}
 131
 132static void dispatch_bios(void *context, struct bio_list *bio_list)
 133{
 134        struct mirror_set *ms = context;
 135        struct bio *bio;
 136
 137        while ((bio = bio_list_pop(bio_list)))
 138                queue_bio(ms, bio, WRITE);
 139}
 140
 141struct dm_raid1_bio_record {
 142        struct mirror *m;
 143        /* if details->bi_bdev == NULL, details were not saved */
 144        struct dm_bio_details details;
 145        region_t write_region;
 146};
 147
 148/*
 149 * Every mirror should look like this one.
 150 */
 151#define DEFAULT_MIRROR 0
 152
 153/*
 154 * This is yucky.  We squirrel the mirror struct away inside
 155 * bi_next for read/write buffers.  This is safe since the bh
 156 * doesn't get submitted to the lower levels of block layer.
 157 */
 158static struct mirror *bio_get_m(struct bio *bio)
 159{
 160        return (struct mirror *) bio->bi_next;
 161}
 162
 163static void bio_set_m(struct bio *bio, struct mirror *m)
 164{
 165        bio->bi_next = (struct bio *) m;
 166}
 167
 168static struct mirror *get_default_mirror(struct mirror_set *ms)
 169{
 170        return &ms->mirror[atomic_read(&ms->default_mirror)];
 171}
 172
 173static void set_default_mirror(struct mirror *m)
 174{
 175        struct mirror_set *ms = m->ms;
 176        struct mirror *m0 = &(ms->mirror[0]);
 177
 178        atomic_set(&ms->default_mirror, m - m0);
 179}
 180
 181static struct mirror *get_valid_mirror(struct mirror_set *ms)
 182{
 183        struct mirror *m;
 184
 185        for (m = ms->mirror; m < ms->mirror + ms->nr_mirrors; m++)
 186                if (!atomic_read(&m->error_count))
 187                        return m;
 188
 189        return NULL;
 190}
 191
 192/* fail_mirror
 193 * @m: mirror device to fail
 194 * @error_type: one of the enum's, DM_RAID1_*_ERROR
 195 *
 196 * If errors are being handled, record the type of
 197 * error encountered for this device.  If this type
 198 * of error has already been recorded, we can return;
 199 * otherwise, we must signal userspace by triggering
 200 * an event.  Additionally, if the device is the
 201 * primary device, we must choose a new primary, but
 202 * only if the mirror is in-sync.
 203 *
 204 * This function must not block.
 205 */
 206static void fail_mirror(struct mirror *m, enum dm_raid1_error error_type)
 207{
 208        struct mirror_set *ms = m->ms;
 209        struct mirror *new;
 210
 211        ms->leg_failure = 1;
 212
 213        /*
 214         * error_count is used for nothing more than a
 215         * simple way to tell if a device has encountered
 216         * errors.
 217         */
 218        atomic_inc(&m->error_count);
 219
 220        if (test_and_set_bit(error_type, &m->error_type))
 221                return;
 222
 223        if (!errors_handled(ms))
 224                return;
 225
 226        if (m != get_default_mirror(ms))
 227                goto out;
 228
 229        if (!ms->in_sync) {
 230                /*
 231                 * Better to issue requests to same failing device
 232                 * than to risk returning corrupt data.
 233                 */
 234                DMERR("Primary mirror (%s) failed while out-of-sync: "
 235                      "Reads may fail.", m->dev->name);
 236                goto out;
 237        }
 238
 239        new = get_valid_mirror(ms);
 240        if (new)
 241                set_default_mirror(new);
 242        else
 243                DMWARN("All sides of mirror have failed.");
 244
 245out:
 246        schedule_work(&ms->trigger_event);
 247}
 248
 249static int mirror_flush(struct dm_target *ti)
 250{
 251        struct mirror_set *ms = ti->private;
 252        unsigned long error_bits;
 253
 254        unsigned int i;
 255        struct dm_io_region io[ms->nr_mirrors];
 256        struct mirror *m;
 257        struct dm_io_request io_req = {
 258                .bi_rw = WRITE_FLUSH,
 259                .mem.type = DM_IO_KMEM,
 260                .mem.ptr.addr = NULL,
 261                .client = ms->io_client,
 262        };
 263
 264        for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++) {
 265                io[i].bdev = m->dev->bdev;
 266                io[i].sector = 0;
 267                io[i].count = 0;
 268        }
 269
 270        error_bits = -1;
 271        dm_io(&io_req, ms->nr_mirrors, io, &error_bits);
 272        if (unlikely(error_bits != 0)) {
 273                for (i = 0; i < ms->nr_mirrors; i++)
 274                        if (test_bit(i, &error_bits))
 275                                fail_mirror(ms->mirror + i,
 276                                            DM_RAID1_FLUSH_ERROR);
 277                return -EIO;
 278        }
 279
 280        return 0;
 281}
 282
 283/*-----------------------------------------------------------------
 284 * Recovery.
 285 *
 286 * When a mirror is first activated we may find that some regions
 287 * are in the no-sync state.  We have to recover these by
 288 * recopying from the default mirror to all the others.
 289 *---------------------------------------------------------------*/
 290static void recovery_complete(int read_err, unsigned long write_err,
 291                              void *context)
 292{
 293        struct dm_region *reg = context;
 294        struct mirror_set *ms = dm_rh_region_context(reg);
 295        int m, bit = 0;
 296
 297        if (read_err) {
 298                /* Read error means the failure of default mirror. */
 299                DMERR_LIMIT("Unable to read primary mirror during recovery");
 300                fail_mirror(get_default_mirror(ms), DM_RAID1_SYNC_ERROR);
 301        }
 302
 303        if (write_err) {
 304                DMERR_LIMIT("Write error during recovery (error = 0x%lx)",
 305                            write_err);
 306                /*
 307                 * Bits correspond to devices (excluding default mirror).
 308                 * The default mirror cannot change during recovery.
 309                 */
 310                for (m = 0; m < ms->nr_mirrors; m++) {
 311                        if (&ms->mirror[m] == get_default_mirror(ms))
 312                                continue;
 313                        if (test_bit(bit, &write_err))
 314                                fail_mirror(ms->mirror + m,
 315                                            DM_RAID1_SYNC_ERROR);
 316                        bit++;
 317                }
 318        }
 319
 320        dm_rh_recovery_end(reg, !(read_err || write_err));
 321}
 322
 323static int recover(struct mirror_set *ms, struct dm_region *reg)
 324{
 325        int r;
 326        unsigned i;
 327        struct dm_io_region from, to[DM_KCOPYD_MAX_REGIONS], *dest;
 328        struct mirror *m;
 329        unsigned long flags = 0;
 330        region_t key = dm_rh_get_region_key(reg);
 331        sector_t region_size = dm_rh_get_region_size(ms->rh);
 332
 333        /* fill in the source */
 334        m = get_default_mirror(ms);
 335        from.bdev = m->dev->bdev;
 336        from.sector = m->offset + dm_rh_region_to_sector(ms->rh, key);
 337        if (key == (ms->nr_regions - 1)) {
 338                /*
 339                 * The final region may be smaller than
 340                 * region_size.
 341                 */
 342                from.count = ms->ti->len & (region_size - 1);
 343                if (!from.count)
 344                        from.count = region_size;
 345        } else
 346                from.count = region_size;
 347
 348        /* fill in the destinations */
 349        for (i = 0, dest = to; i < ms->nr_mirrors; i++) {
 350                if (&ms->mirror[i] == get_default_mirror(ms))
 351                        continue;
 352
 353                m = ms->mirror + i;
 354                dest->bdev = m->dev->bdev;
 355                dest->sector = m->offset + dm_rh_region_to_sector(ms->rh, key);
 356                dest->count = from.count;
 357                dest++;
 358        }
 359
 360        /* hand to kcopyd */
 361        if (!errors_handled(ms))
 362                set_bit(DM_KCOPYD_IGNORE_ERROR, &flags);
 363
 364        r = dm_kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to,
 365                           flags, recovery_complete, reg);
 366
 367        return r;
 368}
 369
 370static void do_recovery(struct mirror_set *ms)
 371{
 372        struct dm_region *reg;
 373        struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
 374        int r;
 375
 376        /*
 377         * Start quiescing some regions.
 378         */
 379        dm_rh_recovery_prepare(ms->rh);
 380
 381        /*
 382         * Copy any already quiesced regions.
 383         */
 384        while ((reg = dm_rh_recovery_start(ms->rh))) {
 385                r = recover(ms, reg);
 386                if (r)
 387                        dm_rh_recovery_end(reg, 0);
 388        }
 389
 390        /*
 391         * Update the in sync flag.
 392         */
 393        if (!ms->in_sync &&
 394            (log->type->get_sync_count(log) == ms->nr_regions)) {
 395                /* the sync is complete */
 396                dm_table_event(ms->ti->table);
 397                ms->in_sync = 1;
 398        }
 399}
 400
 401/*-----------------------------------------------------------------
 402 * Reads
 403 *---------------------------------------------------------------*/
 404static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector)
 405{
 406        struct mirror *m = get_default_mirror(ms);
 407
 408        do {
 409                if (likely(!atomic_read(&m->error_count)))
 410                        return m;
 411
 412                if (m-- == ms->mirror)
 413                        m += ms->nr_mirrors;
 414        } while (m != get_default_mirror(ms));
 415
 416        return NULL;
 417}
 418
 419static int default_ok(struct mirror *m)
 420{
 421        struct mirror *default_mirror = get_default_mirror(m->ms);
 422
 423        return !atomic_read(&default_mirror->error_count);
 424}
 425
 426static int mirror_available(struct mirror_set *ms, struct bio *bio)
 427{
 428        struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
 429        region_t region = dm_rh_bio_to_region(ms->rh, bio);
 430
 431        if (log->type->in_sync(log, region, 0))
 432                return choose_mirror(ms,  bio->bi_sector) ? 1 : 0;
 433
 434        return 0;
 435}
 436
 437/*
 438 * remap a buffer to a particular mirror.
 439 */
 440static sector_t map_sector(struct mirror *m, struct bio *bio)
 441{
 442        if (unlikely(!bio->bi_size))
 443                return 0;
 444        return m->offset + dm_target_offset(m->ms->ti, bio->bi_sector);
 445}
 446
 447static void map_bio(struct mirror *m, struct bio *bio)
 448{
 449        bio->bi_bdev = m->dev->bdev;
 450        bio->bi_sector = map_sector(m, bio);
 451}
 452
 453static void map_region(struct dm_io_region *io, struct mirror *m,
 454                       struct bio *bio)
 455{
 456        io->bdev = m->dev->bdev;
 457        io->sector = map_sector(m, bio);
 458        io->count = bio->bi_size >> 9;
 459}
 460
 461static void hold_bio(struct mirror_set *ms, struct bio *bio)
 462{
 463        /*
 464         * Lock is required to avoid race condition during suspend
 465         * process.
 466         */
 467        spin_lock_irq(&ms->lock);
 468
 469        if (atomic_read(&ms->suspend)) {
 470                spin_unlock_irq(&ms->lock);
 471
 472                /*
 473                 * If device is suspended, complete the bio.
 474                 */
 475                if (dm_noflush_suspending(ms->ti))
 476                        bio_endio(bio, DM_ENDIO_REQUEUE);
 477                else
 478                        bio_endio(bio, -EIO);
 479                return;
 480        }
 481
 482        /*
 483         * Hold bio until the suspend is complete.
 484         */
 485        bio_list_add(&ms->holds, bio);
 486        spin_unlock_irq(&ms->lock);
 487}
 488
 489/*-----------------------------------------------------------------
 490 * Reads
 491 *---------------------------------------------------------------*/
 492static void read_callback(unsigned long error, void *context)
 493{
 494        struct bio *bio = context;
 495        struct mirror *m;
 496
 497        m = bio_get_m(bio);
 498        bio_set_m(bio, NULL);
 499
 500        if (likely(!error)) {
 501                bio_endio(bio, 0);
 502                return;
 503        }
 504
 505        fail_mirror(m, DM_RAID1_READ_ERROR);
 506
 507        if (likely(default_ok(m)) || mirror_available(m->ms, bio)) {
 508                DMWARN_LIMIT("Read failure on mirror device %s.  "
 509                             "Trying alternative device.",
 510                             m->dev->name);
 511                queue_bio(m->ms, bio, bio_rw(bio));
 512                return;
 513        }
 514
 515        DMERR_LIMIT("Read failure on mirror device %s.  Failing I/O.",
 516                    m->dev->name);
 517        bio_endio(bio, -EIO);
 518}
 519
 520/* Asynchronous read. */
 521static void read_async_bio(struct mirror *m, struct bio *bio)
 522{
 523        struct dm_io_region io;
 524        struct dm_io_request io_req = {
 525                .bi_rw = READ,
 526                .mem.type = DM_IO_BVEC,
 527                .mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
 528                .notify.fn = read_callback,
 529                .notify.context = bio,
 530                .client = m->ms->io_client,
 531        };
 532
 533        map_region(&io, m, bio);
 534        bio_set_m(bio, m);
 535        BUG_ON(dm_io(&io_req, 1, &io, NULL));
 536}
 537
 538static inline int region_in_sync(struct mirror_set *ms, region_t region,
 539                                 int may_block)
 540{
 541        int state = dm_rh_get_state(ms->rh, region, may_block);
 542        return state == DM_RH_CLEAN || state == DM_RH_DIRTY;
 543}
 544
 545static void do_reads(struct mirror_set *ms, struct bio_list *reads)
 546{
 547        region_t region;
 548        struct bio *bio;
 549        struct mirror *m;
 550
 551        while ((bio = bio_list_pop(reads))) {
 552                region = dm_rh_bio_to_region(ms->rh, bio);
 553                m = get_default_mirror(ms);
 554
 555                /*
 556                 * We can only read balance if the region is in sync.
 557                 */
 558                if (likely(region_in_sync(ms, region, 1)))
 559                        m = choose_mirror(ms, bio->bi_sector);
 560                else if (m && atomic_read(&m->error_count))
 561                        m = NULL;
 562
 563                if (likely(m))
 564                        read_async_bio(m, bio);
 565                else
 566                        bio_endio(bio, -EIO);
 567        }
 568}
 569
 570/*-----------------------------------------------------------------
 571 * Writes.
 572 *
 573 * We do different things with the write io depending on the
 574 * state of the region that it's in:
 575 *
 576 * SYNC:        increment pending, use kcopyd to write to *all* mirrors
 577 * RECOVERING:  delay the io until recovery completes
 578 * NOSYNC:      increment pending, just write to the default mirror
 579 *---------------------------------------------------------------*/
 580
 581
 582static void write_callback(unsigned long error, void *context)
 583{
 584        unsigned i, ret = 0;
 585        struct bio *bio = (struct bio *) context;
 586        struct mirror_set *ms;
 587        int should_wake = 0;
 588        unsigned long flags;
 589
 590        ms = bio_get_m(bio)->ms;
 591        bio_set_m(bio, NULL);
 592
 593        /*
 594         * NOTE: We don't decrement the pending count here,
 595         * instead it is done by the targets endio function.
 596         * This way we handle both writes to SYNC and NOSYNC
 597         * regions with the same code.
 598         */
 599        if (likely(!error)) {
 600                bio_endio(bio, ret);
 601                return;
 602        }
 603
 604        for (i = 0; i < ms->nr_mirrors; i++)
 605                if (test_bit(i, &error))
 606                        fail_mirror(ms->mirror + i, DM_RAID1_WRITE_ERROR);
 607
 608        /*
 609         * Need to raise event.  Since raising
 610         * events can block, we need to do it in
 611         * the main thread.
 612         */
 613        spin_lock_irqsave(&ms->lock, flags);
 614        if (!ms->failures.head)
 615                should_wake = 1;
 616        bio_list_add(&ms->failures, bio);
 617        spin_unlock_irqrestore(&ms->lock, flags);
 618        if (should_wake)
 619                wakeup_mirrord(ms);
 620}
 621
 622static void do_write(struct mirror_set *ms, struct bio *bio)
 623{
 624        unsigned int i;
 625        struct dm_io_region io[ms->nr_mirrors], *dest = io;
 626        struct mirror *m;
 627        struct dm_io_request io_req = {
 628                .bi_rw = WRITE | (bio->bi_rw & WRITE_FLUSH_FUA),
 629                .mem.type = DM_IO_BVEC,
 630                .mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
 631                .notify.fn = write_callback,
 632                .notify.context = bio,
 633                .client = ms->io_client,
 634        };
 635
 636        if (bio->bi_rw & REQ_DISCARD) {
 637                io_req.bi_rw |= REQ_DISCARD;
 638                io_req.mem.type = DM_IO_KMEM;
 639                io_req.mem.ptr.addr = NULL;
 640        }
 641
 642        for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++)
 643                map_region(dest++, m, bio);
 644
 645        /*
 646         * Use default mirror because we only need it to retrieve the reference
 647         * to the mirror set in write_callback().
 648         */
 649        bio_set_m(bio, get_default_mirror(ms));
 650
 651        BUG_ON(dm_io(&io_req, ms->nr_mirrors, io, NULL));
 652}
 653
 654static void do_writes(struct mirror_set *ms, struct bio_list *writes)
 655{
 656        int state;
 657        struct bio *bio;
 658        struct bio_list sync, nosync, recover, *this_list = NULL;
 659        struct bio_list requeue;
 660        struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
 661        region_t region;
 662
 663        if (!writes->head)
 664                return;
 665
 666        /*
 667         * Classify each write.
 668         */
 669        bio_list_init(&sync);
 670        bio_list_init(&nosync);
 671        bio_list_init(&recover);
 672        bio_list_init(&requeue);
 673
 674        while ((bio = bio_list_pop(writes))) {
 675                if ((bio->bi_rw & REQ_FLUSH) ||
 676                    (bio->bi_rw & REQ_DISCARD)) {
 677                        bio_list_add(&sync, bio);
 678                        continue;
 679                }
 680
 681                region = dm_rh_bio_to_region(ms->rh, bio);
 682
 683                if (log->type->is_remote_recovering &&
 684                    log->type->is_remote_recovering(log, region)) {
 685                        bio_list_add(&requeue, bio);
 686                        continue;
 687                }
 688
 689                state = dm_rh_get_state(ms->rh, region, 1);
 690                switch (state) {
 691                case DM_RH_CLEAN:
 692                case DM_RH_DIRTY:
 693                        this_list = &sync;
 694                        break;
 695
 696                case DM_RH_NOSYNC:
 697                        this_list = &nosync;
 698                        break;
 699
 700                case DM_RH_RECOVERING:
 701                        this_list = &recover;
 702                        break;
 703                }
 704
 705                bio_list_add(this_list, bio);
 706        }
 707
 708        /*
 709         * Add bios that are delayed due to remote recovery
 710         * back on to the write queue
 711         */
 712        if (unlikely(requeue.head)) {
 713                spin_lock_irq(&ms->lock);
 714                bio_list_merge(&ms->writes, &requeue);
 715                spin_unlock_irq(&ms->lock);
 716                delayed_wake(ms);
 717        }
 718
 719        /*
 720         * Increment the pending counts for any regions that will
 721         * be written to (writes to recover regions are going to
 722         * be delayed).
 723         */
 724        dm_rh_inc_pending(ms->rh, &sync);
 725        dm_rh_inc_pending(ms->rh, &nosync);
 726
 727        /*
 728         * If the flush fails on a previous call and succeeds here,
 729         * we must not reset the log_failure variable.  We need
 730         * userspace interaction to do that.
 731         */
 732        ms->log_failure = dm_rh_flush(ms->rh) ? 1 : ms->log_failure;
 733
 734        /*
 735         * Dispatch io.
 736         */
 737        if (unlikely(ms->log_failure) && errors_handled(ms)) {
 738                spin_lock_irq(&ms->lock);
 739                bio_list_merge(&ms->failures, &sync);
 740                spin_unlock_irq(&ms->lock);
 741                wakeup_mirrord(ms);
 742        } else
 743                while ((bio = bio_list_pop(&sync)))
 744                        do_write(ms, bio);
 745
 746        while ((bio = bio_list_pop(&recover)))
 747                dm_rh_delay(ms->rh, bio);
 748
 749        while ((bio = bio_list_pop(&nosync))) {
 750                if (unlikely(ms->leg_failure) && errors_handled(ms)) {
 751                        spin_lock_irq(&ms->lock);
 752                        bio_list_add(&ms->failures, bio);
 753                        spin_unlock_irq(&ms->lock);
 754                        wakeup_mirrord(ms);
 755                } else {
 756                        map_bio(get_default_mirror(ms), bio);
 757                        generic_make_request(bio);
 758                }
 759        }
 760}
 761
 762static void do_failures(struct mirror_set *ms, struct bio_list *failures)
 763{
 764        struct bio *bio;
 765
 766        if (likely(!failures->head))
 767                return;
 768
 769        /*
 770         * If the log has failed, unattempted writes are being
 771         * put on the holds list.  We can't issue those writes
 772         * until a log has been marked, so we must store them.
 773         *
 774         * If a 'noflush' suspend is in progress, we can requeue
 775         * the I/O's to the core.  This give userspace a chance
 776         * to reconfigure the mirror, at which point the core
 777         * will reissue the writes.  If the 'noflush' flag is
 778         * not set, we have no choice but to return errors.
 779         *
 780         * Some writes on the failures list may have been
 781         * submitted before the log failure and represent a
 782         * failure to write to one of the devices.  It is ok
 783         * for us to treat them the same and requeue them
 784         * as well.
 785         */
 786        while ((bio = bio_list_pop(failures))) {
 787                if (!ms->log_failure) {
 788                        ms->in_sync = 0;
 789                        dm_rh_mark_nosync(ms->rh, bio);
 790                }
 791
 792                /*
 793                 * If all the legs are dead, fail the I/O.
 794                 * If we have been told to handle errors, hold the bio
 795                 * and wait for userspace to deal with the problem.
 796                 * Otherwise pretend that the I/O succeeded. (This would
 797                 * be wrong if the failed leg returned after reboot and
 798                 * got replicated back to the good legs.)
 799                 */
 800                if (!get_valid_mirror(ms))
 801                        bio_endio(bio, -EIO);
 802                else if (errors_handled(ms))
 803                        hold_bio(ms, bio);
 804                else
 805                        bio_endio(bio, 0);
 806        }
 807}
 808
 809static void trigger_event(struct work_struct *work)
 810{
 811        struct mirror_set *ms =
 812                container_of(work, struct mirror_set, trigger_event);
 813
 814        dm_table_event(ms->ti->table);
 815}
 816
 817/*-----------------------------------------------------------------
 818 * kmirrord
 819 *---------------------------------------------------------------*/
 820static void do_mirror(struct work_struct *work)
 821{
 822        struct mirror_set *ms = container_of(work, struct mirror_set,
 823                                             kmirrord_work);
 824        struct bio_list reads, writes, failures;
 825        unsigned long flags;
 826
 827        spin_lock_irqsave(&ms->lock, flags);
 828        reads = ms->reads;
 829        writes = ms->writes;
 830        failures = ms->failures;
 831        bio_list_init(&ms->reads);
 832        bio_list_init(&ms->writes);
 833        bio_list_init(&ms->failures);
 834        spin_unlock_irqrestore(&ms->lock, flags);
 835
 836        dm_rh_update_states(ms->rh, errors_handled(ms));
 837        do_recovery(ms);
 838        do_reads(ms, &reads);
 839        do_writes(ms, &writes);
 840        do_failures(ms, &failures);
 841}
 842
 843/*-----------------------------------------------------------------
 844 * Target functions
 845 *---------------------------------------------------------------*/
 846static struct mirror_set *alloc_context(unsigned int nr_mirrors,
 847                                        uint32_t region_size,
 848                                        struct dm_target *ti,
 849                                        struct dm_dirty_log *dl)
 850{
 851        size_t len;
 852        struct mirror_set *ms = NULL;
 853
 854        len = sizeof(*ms) + (sizeof(ms->mirror[0]) * nr_mirrors);
 855
 856        ms = kzalloc(len, GFP_KERNEL);
 857        if (!ms) {
 858                ti->error = "Cannot allocate mirror context";
 859                return NULL;
 860        }
 861
 862        spin_lock_init(&ms->lock);
 863        bio_list_init(&ms->reads);
 864        bio_list_init(&ms->writes);
 865        bio_list_init(&ms->failures);
 866        bio_list_init(&ms->holds);
 867
 868        ms->ti = ti;
 869        ms->nr_mirrors = nr_mirrors;
 870        ms->nr_regions = dm_sector_div_up(ti->len, region_size);
 871        ms->in_sync = 0;
 872        ms->log_failure = 0;
 873        ms->leg_failure = 0;
 874        atomic_set(&ms->suspend, 0);
 875        atomic_set(&ms->default_mirror, DEFAULT_MIRROR);
 876
 877        ms->io_client = dm_io_client_create();
 878        if (IS_ERR(ms->io_client)) {
 879                ti->error = "Error creating dm_io client";
 880                kfree(ms);
 881                return NULL;
 882        }
 883
 884        ms->rh = dm_region_hash_create(ms, dispatch_bios, wakeup_mirrord,
 885                                       wakeup_all_recovery_waiters,
 886                                       ms->ti->begin, MAX_RECOVERY,
 887                                       dl, region_size, ms->nr_regions);
 888        if (IS_ERR(ms->rh)) {
 889                ti->error = "Error creating dirty region hash";
 890                dm_io_client_destroy(ms->io_client);
 891                kfree(ms);
 892                return NULL;
 893        }
 894
 895        return ms;
 896}
 897
 898static void free_context(struct mirror_set *ms, struct dm_target *ti,
 899                         unsigned int m)
 900{
 901        while (m--)
 902                dm_put_device(ti, ms->mirror[m].dev);
 903
 904        dm_io_client_destroy(ms->io_client);
 905        dm_region_hash_destroy(ms->rh);
 906        kfree(ms);
 907}
 908
 909static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
 910                      unsigned int mirror, char **argv)
 911{
 912        unsigned long long offset;
 913        char dummy;
 914
 915        if (sscanf(argv[1], "%llu%c", &offset, &dummy) != 1) {
 916                ti->error = "Invalid offset";
 917                return -EINVAL;
 918        }
 919
 920        if (dm_get_device(ti, argv[0], dm_table_get_mode(ti->table),
 921                          &ms->mirror[mirror].dev)) {
 922                ti->error = "Device lookup failure";
 923                return -ENXIO;
 924        }
 925
 926        ms->mirror[mirror].ms = ms;
 927        atomic_set(&(ms->mirror[mirror].error_count), 0);
 928        ms->mirror[mirror].error_type = 0;
 929        ms->mirror[mirror].offset = offset;
 930
 931        return 0;
 932}
 933
 934/*
 935 * Create dirty log: log_type #log_params <log_params>
 936 */
 937static struct dm_dirty_log *create_dirty_log(struct dm_target *ti,
 938                                             unsigned argc, char **argv,
 939                                             unsigned *args_used)
 940{
 941        unsigned param_count;
 942        struct dm_dirty_log *dl;
 943        char dummy;
 944
 945        if (argc < 2) {
 946                ti->error = "Insufficient mirror log arguments";
 947                return NULL;
 948        }
 949
 950        if (sscanf(argv[1], "%u%c", &param_count, &dummy) != 1) {
 951                ti->error = "Invalid mirror log argument count";
 952                return NULL;
 953        }
 954
 955        *args_used = 2 + param_count;
 956
 957        if (argc < *args_used) {
 958                ti->error = "Insufficient mirror log arguments";
 959                return NULL;
 960        }
 961
 962        dl = dm_dirty_log_create(argv[0], ti, mirror_flush, param_count,
 963                                 argv + 2);
 964        if (!dl) {
 965                ti->error = "Error creating mirror dirty log";
 966                return NULL;
 967        }
 968
 969        return dl;
 970}
 971
 972static int parse_features(struct mirror_set *ms, unsigned argc, char **argv,
 973                          unsigned *args_used)
 974{
 975        unsigned num_features;
 976        struct dm_target *ti = ms->ti;
 977        char dummy;
 978
 979        *args_used = 0;
 980
 981        if (!argc)
 982                return 0;
 983
 984        if (sscanf(argv[0], "%u%c", &num_features, &dummy) != 1) {
 985                ti->error = "Invalid number of features";
 986                return -EINVAL;
 987        }
 988
 989        argc--;
 990        argv++;
 991        (*args_used)++;
 992
 993        if (num_features > argc) {
 994                ti->error = "Not enough arguments to support feature count";
 995                return -EINVAL;
 996        }
 997
 998        if (!strcmp("handle_errors", argv[0]))
 999                ms->features |= DM_RAID1_HANDLE_ERRORS;
1000        else {
1001                ti->error = "Unrecognised feature requested";
1002                return -EINVAL;
1003        }
1004
1005        (*args_used)++;
1006
1007        return 0;
1008}
1009
1010/*
1011 * Construct a mirror mapping:
1012 *
1013 * log_type #log_params <log_params>
1014 * #mirrors [mirror_path offset]{2,}
1015 * [#features <features>]
1016 *
1017 * log_type is "core" or "disk"
1018 * #log_params is between 1 and 3
1019 *
1020 * If present, features must be "handle_errors".
1021 */
1022static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1023{
1024        int r;
1025        unsigned int nr_mirrors, m, args_used;
1026        struct mirror_set *ms;
1027        struct dm_dirty_log *dl;
1028        char dummy;
1029
1030        dl = create_dirty_log(ti, argc, argv, &args_used);
1031        if (!dl)
1032                return -EINVAL;
1033
1034        argv += args_used;
1035        argc -= args_used;
1036
1037        if (!argc || sscanf(argv[0], "%u%c", &nr_mirrors, &dummy) != 1 ||
1038            nr_mirrors < 2 || nr_mirrors > DM_KCOPYD_MAX_REGIONS + 1) {
1039                ti->error = "Invalid number of mirrors";
1040                dm_dirty_log_destroy(dl);
1041                return -EINVAL;
1042        }
1043
1044        argv++, argc--;
1045
1046        if (argc < nr_mirrors * 2) {
1047                ti->error = "Too few mirror arguments";
1048                dm_dirty_log_destroy(dl);
1049                return -EINVAL;
1050        }
1051
1052        ms = alloc_context(nr_mirrors, dl->type->get_region_size(dl), ti, dl);
1053        if (!ms) {
1054                dm_dirty_log_destroy(dl);
1055                return -ENOMEM;
1056        }
1057
1058        /* Get the mirror parameter sets */
1059        for (m = 0; m < nr_mirrors; m++) {
1060                r = get_mirror(ms, ti, m, argv);
1061                if (r) {
1062                        free_context(ms, ti, m);
1063                        return r;
1064                }
1065                argv += 2;
1066                argc -= 2;
1067        }
1068
1069        ti->private = ms;
1070
1071        r = dm_set_target_max_io_len(ti, dm_rh_get_region_size(ms->rh));
1072        if (r)
1073                goto err_free_context;
1074
1075        ti->num_flush_requests = 1;
1076        ti->num_discard_requests = 1;
1077        ti->per_bio_data_size = sizeof(struct dm_raid1_bio_record);
1078        ti->discard_zeroes_data_unsupported = true;
1079
1080        ms->kmirrord_wq = alloc_workqueue("kmirrord",
1081                                          WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
1082        if (!ms->kmirrord_wq) {
1083                DMERR("couldn't start kmirrord");
1084                r = -ENOMEM;
1085                goto err_free_context;
1086        }
1087        INIT_WORK(&ms->kmirrord_work, do_mirror);
1088        init_timer(&ms->timer);
1089        ms->timer_pending = 0;
1090        INIT_WORK(&ms->trigger_event, trigger_event);
1091
1092        r = parse_features(ms, argc, argv, &args_used);
1093        if (r)
1094                goto err_destroy_wq;
1095
1096        argv += args_used;
1097        argc -= args_used;
1098
1099        /*
1100         * Any read-balancing addition depends on the
1101         * DM_RAID1_HANDLE_ERRORS flag being present.
1102         * This is because the decision to balance depends
1103         * on the sync state of a region.  If the above
1104         * flag is not present, we ignore errors; and
1105         * the sync state may be inaccurate.
1106         */
1107
1108        if (argc) {
1109                ti->error = "Too many mirror arguments";
1110                r = -EINVAL;
1111                goto err_destroy_wq;
1112        }
1113
1114        ms->kcopyd_client = dm_kcopyd_client_create();
1115        if (IS_ERR(ms->kcopyd_client)) {
1116                r = PTR_ERR(ms->kcopyd_client);
1117                goto err_destroy_wq;
1118        }
1119
1120        wakeup_mirrord(ms);
1121        return 0;
1122
1123err_destroy_wq:
1124        destroy_workqueue(ms->kmirrord_wq);
1125err_free_context:
1126        free_context(ms, ti, ms->nr_mirrors);
1127        return r;
1128}
1129
1130static void mirror_dtr(struct dm_target *ti)
1131{
1132        struct mirror_set *ms = (struct mirror_set *) ti->private;
1133
1134        del_timer_sync(&ms->timer);
1135        flush_workqueue(ms->kmirrord_wq);
1136        flush_work(&ms->trigger_event);
1137        dm_kcopyd_client_destroy(ms->kcopyd_client);
1138        destroy_workqueue(ms->kmirrord_wq);
1139        free_context(ms, ti, ms->nr_mirrors);
1140}
1141
1142/*
1143 * Mirror mapping function
1144 */
1145static int mirror_map(struct dm_target *ti, struct bio *bio)
1146{
1147        int r, rw = bio_rw(bio);
1148        struct mirror *m;
1149        struct mirror_set *ms = ti->private;
1150        struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1151        struct dm_raid1_bio_record *bio_record =
1152          dm_per_bio_data(bio, sizeof(struct dm_raid1_bio_record));
1153
1154        bio_record->details.bi_bdev = NULL;
1155
1156        if (rw == WRITE) {
1157                /* Save region for mirror_end_io() handler */
1158                bio_record->write_region = dm_rh_bio_to_region(ms->rh, bio);
1159                queue_bio(ms, bio, rw);
1160                return DM_MAPIO_SUBMITTED;
1161        }
1162
1163        r = log->type->in_sync(log, dm_rh_bio_to_region(ms->rh, bio), 0);
1164        if (r < 0 && r != -EWOULDBLOCK)
1165                return r;
1166
1167        /*
1168         * If region is not in-sync queue the bio.
1169         */
1170        if (!r || (r == -EWOULDBLOCK)) {
1171                if (rw == READA)
1172                        return -EWOULDBLOCK;
1173
1174                queue_bio(ms, bio, rw);
1175                return DM_MAPIO_SUBMITTED;
1176        }
1177
1178        /*
1179         * The region is in-sync and we can perform reads directly.
1180         * Store enough information so we can retry if it fails.
1181         */
1182        m = choose_mirror(ms, bio->bi_sector);
1183        if (unlikely(!m))
1184                return -EIO;
1185
1186        dm_bio_record(&bio_record->details, bio);
1187        bio_record->m = m;
1188
1189        map_bio(m, bio);
1190
1191        return DM_MAPIO_REMAPPED;
1192}
1193
1194static int mirror_end_io(struct dm_target *ti, struct bio *bio, int error)
1195{
1196        int rw = bio_rw(bio);
1197        struct mirror_set *ms = (struct mirror_set *) ti->private;
1198        struct mirror *m = NULL;
1199        struct dm_bio_details *bd = NULL;
1200        struct dm_raid1_bio_record *bio_record =
1201          dm_per_bio_data(bio, sizeof(struct dm_raid1_bio_record));
1202
1203        /*
1204         * We need to dec pending if this was a write.
1205         */
1206        if (rw == WRITE) {
1207                if (!(bio->bi_rw & (REQ_FLUSH | REQ_DISCARD)))
1208                        dm_rh_dec(ms->rh, bio_record->write_region);
1209                return error;
1210        }
1211
1212        if (error == -EOPNOTSUPP)
1213                goto out;
1214
1215        if ((error == -EWOULDBLOCK) && (bio->bi_rw & REQ_RAHEAD))
1216                goto out;
1217
1218        if (unlikely(error)) {
1219                if (!bio_record->details.bi_bdev) {
1220                        /*
1221                         * There wasn't enough memory to record necessary
1222                         * information for a retry or there was no other
1223                         * mirror in-sync.
1224                         */
1225                        DMERR_LIMIT("Mirror read failed.");
1226                        return -EIO;
1227                }
1228
1229                m = bio_record->m;
1230
1231                DMERR("Mirror read failed from %s. Trying alternative device.",
1232                      m->dev->name);
1233
1234                fail_mirror(m, DM_RAID1_READ_ERROR);
1235
1236                /*
1237                 * A failed read is requeued for another attempt using an intact
1238                 * mirror.
1239                 */
1240                if (default_ok(m) || mirror_available(ms, bio)) {
1241                        bd = &bio_record->details;
1242
1243                        dm_bio_restore(bd, bio);
1244                        bio_record->details.bi_bdev = NULL;
1245                        queue_bio(ms, bio, rw);
1246                        return DM_ENDIO_INCOMPLETE;
1247                }
1248                DMERR("All replicated volumes dead, failing I/O");
1249        }
1250
1251out:
1252        bio_record->details.bi_bdev = NULL;
1253
1254        return error;
1255}
1256
1257static void mirror_presuspend(struct dm_target *ti)
1258{
1259        struct mirror_set *ms = (struct mirror_set *) ti->private;
1260        struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1261
1262        struct bio_list holds;
1263        struct bio *bio;
1264
1265        atomic_set(&ms->suspend, 1);
1266
1267        /*
1268         * Process bios in the hold list to start recovery waiting
1269         * for bios in the hold list. After the process, no bio has
1270         * a chance to be added in the hold list because ms->suspend
1271         * is set.
1272         */
1273        spin_lock_irq(&ms->lock);
1274        holds = ms->holds;
1275        bio_list_init(&ms->holds);
1276        spin_unlock_irq(&ms->lock);
1277
1278        while ((bio = bio_list_pop(&holds)))
1279                hold_bio(ms, bio);
1280
1281        /*
1282         * We must finish up all the work that we've
1283         * generated (i.e. recovery work).
1284         */
1285        dm_rh_stop_recovery(ms->rh);
1286
1287        wait_event(_kmirrord_recovery_stopped,
1288                   !dm_rh_recovery_in_flight(ms->rh));
1289
1290        if (log->type->presuspend && log->type->presuspend(log))
1291                /* FIXME: need better error handling */
1292                DMWARN("log presuspend failed");
1293
1294        /*
1295         * Now that recovery is complete/stopped and the
1296         * delayed bios are queued, we need to wait for
1297         * the worker thread to complete.  This way,
1298         * we know that all of our I/O has been pushed.
1299         */
1300        flush_workqueue(ms->kmirrord_wq);
1301}
1302
1303static void mirror_postsuspend(struct dm_target *ti)
1304{
1305        struct mirror_set *ms = ti->private;
1306        struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1307
1308        if (log->type->postsuspend && log->type->postsuspend(log))
1309                /* FIXME: need better error handling */
1310                DMWARN("log postsuspend failed");
1311}
1312
1313static void mirror_resume(struct dm_target *ti)
1314{
1315        struct mirror_set *ms = ti->private;
1316        struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1317
1318        atomic_set(&ms->suspend, 0);
1319        if (log->type->resume && log->type->resume(log))
1320                /* FIXME: need better error handling */
1321                DMWARN("log resume failed");
1322        dm_rh_start_recovery(ms->rh);
1323}
1324
1325/*
1326 * device_status_char
1327 * @m: mirror device/leg we want the status of
1328 *
1329 * We return one character representing the most severe error
1330 * we have encountered.
1331 *    A => Alive - No failures
1332 *    D => Dead - A write failure occurred leaving mirror out-of-sync
1333 *    S => Sync - A sychronization failure occurred, mirror out-of-sync
1334 *    R => Read - A read failure occurred, mirror data unaffected
1335 *
1336 * Returns: <char>
1337 */
1338static char device_status_char(struct mirror *m)
1339{
1340        if (!atomic_read(&(m->error_count)))
1341                return 'A';
1342
1343        return (test_bit(DM_RAID1_FLUSH_ERROR, &(m->error_type))) ? 'F' :
1344                (test_bit(DM_RAID1_WRITE_ERROR, &(m->error_type))) ? 'D' :
1345                (test_bit(DM_RAID1_SYNC_ERROR, &(m->error_type))) ? 'S' :
1346                (test_bit(DM_RAID1_READ_ERROR, &(m->error_type))) ? 'R' : 'U';
1347}
1348
1349
1350static void mirror_status(struct dm_target *ti, status_type_t type,
1351                          unsigned status_flags, char *result, unsigned maxlen)
1352{
1353        unsigned int m, sz = 0;
1354        struct mirror_set *ms = (struct mirror_set *) ti->private;
1355        struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1356        char buffer[ms->nr_mirrors + 1];
1357
1358        switch (type) {
1359        case STATUSTYPE_INFO:
1360                DMEMIT("%d ", ms->nr_mirrors);
1361                for (m = 0; m < ms->nr_mirrors; m++) {
1362                        DMEMIT("%s ", ms->mirror[m].dev->name);
1363                        buffer[m] = device_status_char(&(ms->mirror[m]));
1364                }
1365                buffer[m] = '\0';
1366
1367                DMEMIT("%llu/%llu 1 %s ",
1368                      (unsigned long long)log->type->get_sync_count(log),
1369                      (unsigned long long)ms->nr_regions, buffer);
1370
1371                sz += log->type->status(log, type, result+sz, maxlen-sz);
1372
1373                break;
1374
1375        case STATUSTYPE_TABLE:
1376                sz = log->type->status(log, type, result, maxlen);
1377
1378                DMEMIT("%d", ms->nr_mirrors);
1379                for (m = 0; m < ms->nr_mirrors; m++)
1380                        DMEMIT(" %s %llu", ms->mirror[m].dev->name,
1381                               (unsigned long long)ms->mirror[m].offset);
1382
1383                if (ms->features & DM_RAID1_HANDLE_ERRORS)
1384                        DMEMIT(" 1 handle_errors");
1385        }
1386}
1387
1388static int mirror_iterate_devices(struct dm_target *ti,
1389                                  iterate_devices_callout_fn fn, void *data)
1390{
1391        struct mirror_set *ms = ti->private;
1392        int ret = 0;
1393        unsigned i;
1394
1395        for (i = 0; !ret && i < ms->nr_mirrors; i++)
1396                ret = fn(ti, ms->mirror[i].dev,
1397                         ms->mirror[i].offset, ti->len, data);
1398
1399        return ret;
1400}
1401
1402static struct target_type mirror_target = {
1403        .name    = "mirror",
1404        .version = {1, 13, 2},
1405        .module  = THIS_MODULE,
1406        .ctr     = mirror_ctr,
1407        .dtr     = mirror_dtr,
1408        .map     = mirror_map,
1409        .end_io  = mirror_end_io,
1410        .presuspend = mirror_presuspend,
1411        .postsuspend = mirror_postsuspend,
1412        .resume  = mirror_resume,
1413        .status  = mirror_status,
1414        .iterate_devices = mirror_iterate_devices,
1415};
1416
1417static int __init dm_mirror_init(void)
1418{
1419        int r;
1420
1421        r = dm_register_target(&mirror_target);
1422        if (r < 0) {
1423                DMERR("Failed to register mirror target");
1424                goto bad_target;
1425        }
1426
1427        return 0;
1428
1429bad_target:
1430        return r;
1431}
1432
1433static void __exit dm_mirror_exit(void)
1434{
1435        dm_unregister_target(&mirror_target);
1436}
1437
1438/* Module hooks */
1439module_init(dm_mirror_init);
1440module_exit(dm_mirror_exit);
1441
1442MODULE_DESCRIPTION(DM_NAME " mirror target");
1443MODULE_AUTHOR("Joe Thornber");
1444MODULE_LICENSE("GPL");
1445
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.