linux/drivers/md/dm-kcopyd.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2002 Sistina Software (UK) Limited.
   3 * Copyright (C) 2006 Red Hat GmbH
   4 *
   5 * This file is released under the GPL.
   6 *
   7 * Kcopyd provides a simple interface for copying an area of one
   8 * block-device to one or more other block-devices, with an asynchronous
   9 * completion notification.
  10 */
  11
  12#include <linux/types.h>
  13#include <linux/atomic.h>
  14#include <linux/blkdev.h>
  15#include <linux/fs.h>
  16#include <linux/init.h>
  17#include <linux/list.h>
  18#include <linux/mempool.h>
  19#include <linux/module.h>
  20#include <linux/pagemap.h>
  21#include <linux/slab.h>
  22#include <linux/vmalloc.h>
  23#include <linux/workqueue.h>
  24#include <linux/mutex.h>
  25#include <linux/device-mapper.h>
  26#include <linux/dm-kcopyd.h>
  27
  28#include "dm.h"
  29
  30#define SUB_JOB_SIZE    128
  31#define SPLIT_COUNT     8
  32#define MIN_JOBS        8
  33#define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
  34
  35/*-----------------------------------------------------------------
  36 * Each kcopyd client has its own little pool of preallocated
  37 * pages for kcopyd io.
  38 *---------------------------------------------------------------*/
  39struct dm_kcopyd_client {
  40        struct page_list *pages;
  41        unsigned nr_reserved_pages;
  42        unsigned nr_free_pages;
  43
  44        struct dm_io_client *io_client;
  45
  46        wait_queue_head_t destroyq;
  47        atomic_t nr_jobs;
  48
  49        mempool_t *job_pool;
  50
  51        struct workqueue_struct *kcopyd_wq;
  52        struct work_struct kcopyd_work;
  53
  54/*
  55 * We maintain three lists of jobs:
  56 *
  57 * i)   jobs waiting for pages
  58 * ii)  jobs that have pages, and are waiting for the io to be issued.
  59 * iii) jobs that have completed.
  60 *
  61 * All three of these are protected by job_lock.
  62 */
  63        spinlock_t job_lock;
  64        struct list_head complete_jobs;
  65        struct list_head io_jobs;
  66        struct list_head pages_jobs;
  67};
  68
  69static struct page_list zero_page_list;
  70
  71static void wake(struct dm_kcopyd_client *kc)
  72{
  73        queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
  74}
  75
  76/*
  77 * Obtain one page for the use of kcopyd.
  78 */
  79static struct page_list *alloc_pl(gfp_t gfp)
  80{
  81        struct page_list *pl;
  82
  83        pl = kmalloc(sizeof(*pl), gfp);
  84        if (!pl)
  85                return NULL;
  86
  87        pl->page = alloc_page(gfp);
  88        if (!pl->page) {
  89                kfree(pl);
  90                return NULL;
  91        }
  92
  93        return pl;
  94}
  95
  96static void free_pl(struct page_list *pl)
  97{
  98        __free_page(pl->page);
  99        kfree(pl);
 100}
 101
 102/*
 103 * Add the provided pages to a client's free page list, releasing
 104 * back to the system any beyond the reserved_pages limit.
 105 */
 106static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
 107{
 108        struct page_list *next;
 109
 110        do {
 111                next = pl->next;
 112
 113                if (kc->nr_free_pages >= kc->nr_reserved_pages)
 114                        free_pl(pl);
 115                else {
 116                        pl->next = kc->pages;
 117                        kc->pages = pl;
 118                        kc->nr_free_pages++;
 119                }
 120
 121                pl = next;
 122        } while (pl);
 123}
 124
 125static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
 126                            unsigned int nr, struct page_list **pages)
 127{
 128        struct page_list *pl;
 129
 130        *pages = NULL;
 131
 132        do {
 133                pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY);
 134                if (unlikely(!pl)) {
 135                        /* Use reserved pages */
 136                        pl = kc->pages;
 137                        if (unlikely(!pl))
 138                                goto out_of_memory;
 139                        kc->pages = pl->next;
 140                        kc->nr_free_pages--;
 141                }
 142                pl->next = *pages;
 143                *pages = pl;
 144        } while (--nr);
 145
 146        return 0;
 147
 148out_of_memory:
 149        if (*pages)
 150                kcopyd_put_pages(kc, *pages);
 151        return -ENOMEM;
 152}
 153
 154/*
 155 * These three functions resize the page pool.
 156 */
 157static void drop_pages(struct page_list *pl)
 158{
 159        struct page_list *next;
 160
 161        while (pl) {
 162                next = pl->next;
 163                free_pl(pl);
 164                pl = next;
 165        }
 166}
 167
 168/*
 169 * Allocate and reserve nr_pages for the use of a specific client.
 170 */
 171static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
 172{
 173        unsigned i;
 174        struct page_list *pl = NULL, *next;
 175
 176        for (i = 0; i < nr_pages; i++) {
 177                next = alloc_pl(GFP_KERNEL);
 178                if (!next) {
 179                        if (pl)
 180                                drop_pages(pl);
 181                        return -ENOMEM;
 182                }
 183                next->next = pl;
 184                pl = next;
 185        }
 186
 187        kc->nr_reserved_pages += nr_pages;
 188        kcopyd_put_pages(kc, pl);
 189
 190        return 0;
 191}
 192
 193static void client_free_pages(struct dm_kcopyd_client *kc)
 194{
 195        BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
 196        drop_pages(kc->pages);
 197        kc->pages = NULL;
 198        kc->nr_free_pages = kc->nr_reserved_pages = 0;
 199}
 200
 201/*-----------------------------------------------------------------
 202 * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 203 * for this reason we use a mempool to prevent the client from
 204 * ever having to do io (which could cause a deadlock).
 205 *---------------------------------------------------------------*/
 206struct kcopyd_job {
 207        struct dm_kcopyd_client *kc;
 208        struct list_head list;
 209        unsigned long flags;
 210
 211        /*
 212         * Error state of the job.
 213         */
 214        int read_err;
 215        unsigned long write_err;
 216
 217        /*
 218         * Either READ or WRITE
 219         */
 220        int rw;
 221        struct dm_io_region source;
 222
 223        /*
 224         * The destinations for the transfer.
 225         */
 226        unsigned int num_dests;
 227        struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
 228
 229        struct page_list *pages;
 230
 231        /*
 232         * Set this to ensure you are notified when the job has
 233         * completed.  'context' is for callback to use.
 234         */
 235        dm_kcopyd_notify_fn fn;
 236        void *context;
 237
 238        /*
 239         * These fields are only used if the job has been split
 240         * into more manageable parts.
 241         */
 242        struct mutex lock;
 243        atomic_t sub_jobs;
 244        sector_t progress;
 245
 246        struct kcopyd_job *master_job;
 247};
 248
 249static struct kmem_cache *_job_cache;
 250
 251int __init dm_kcopyd_init(void)
 252{
 253        _job_cache = kmem_cache_create("kcopyd_job",
 254                                sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
 255                                __alignof__(struct kcopyd_job), 0, NULL);
 256        if (!_job_cache)
 257                return -ENOMEM;
 258
 259        zero_page_list.next = &zero_page_list;
 260        zero_page_list.page = ZERO_PAGE(0);
 261
 262        return 0;
 263}
 264
 265void dm_kcopyd_exit(void)
 266{
 267        kmem_cache_destroy(_job_cache);
 268        _job_cache = NULL;
 269}
 270
 271/*
 272 * Functions to push and pop a job onto the head of a given job
 273 * list.
 274 */
 275static struct kcopyd_job *pop(struct list_head *jobs,
 276                              struct dm_kcopyd_client *kc)
 277{
 278        struct kcopyd_job *job = NULL;
 279        unsigned long flags;
 280
 281        spin_lock_irqsave(&kc->job_lock, flags);
 282
 283        if (!list_empty(jobs)) {
 284                job = list_entry(jobs->next, struct kcopyd_job, list);
 285                list_del(&job->list);
 286        }
 287        spin_unlock_irqrestore(&kc->job_lock, flags);
 288
 289        return job;
 290}
 291
 292static void push(struct list_head *jobs, struct kcopyd_job *job)
 293{
 294        unsigned long flags;
 295        struct dm_kcopyd_client *kc = job->kc;
 296
 297        spin_lock_irqsave(&kc->job_lock, flags);
 298        list_add_tail(&job->list, jobs);
 299        spin_unlock_irqrestore(&kc->job_lock, flags);
 300}
 301
 302
 303static void push_head(struct list_head *jobs, struct kcopyd_job *job)
 304{
 305        unsigned long flags;
 306        struct dm_kcopyd_client *kc = job->kc;
 307
 308        spin_lock_irqsave(&kc->job_lock, flags);
 309        list_add(&job->list, jobs);
 310        spin_unlock_irqrestore(&kc->job_lock, flags);
 311}
 312
 313/*
 314 * These three functions process 1 item from the corresponding
 315 * job list.
 316 *
 317 * They return:
 318 * < 0: error
 319 *   0: success
 320 * > 0: can't process yet.
 321 */
 322static int run_complete_job(struct kcopyd_job *job)
 323{
 324        void *context = job->context;
 325        int read_err = job->read_err;
 326        unsigned long write_err = job->write_err;
 327        dm_kcopyd_notify_fn fn = job->fn;
 328        struct dm_kcopyd_client *kc = job->kc;
 329
 330        if (job->pages && job->pages != &zero_page_list)
 331                kcopyd_put_pages(kc, job->pages);
 332        /*
 333         * If this is the master job, the sub jobs have already
 334         * completed so we can free everything.
 335         */
 336        if (job->master_job == job)
 337                mempool_free(job, kc->job_pool);
 338        fn(read_err, write_err, context);
 339
 340        if (atomic_dec_and_test(&kc->nr_jobs))
 341                wake_up(&kc->destroyq);
 342
 343        return 0;
 344}
 345
 346static void complete_io(unsigned long error, void *context)
 347{
 348        struct kcopyd_job *job = (struct kcopyd_job *) context;
 349        struct dm_kcopyd_client *kc = job->kc;
 350
 351        if (error) {
 352                if (job->rw & WRITE)
 353                        job->write_err |= error;
 354                else
 355                        job->read_err = 1;
 356
 357                if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 358                        push(&kc->complete_jobs, job);
 359                        wake(kc);
 360                        return;
 361                }
 362        }
 363
 364        if (job->rw & WRITE)
 365                push(&kc->complete_jobs, job);
 366
 367        else {
 368                job->rw = WRITE;
 369                push(&kc->io_jobs, job);
 370        }
 371
 372        wake(kc);
 373}
 374
 375/*
 376 * Request io on as many buffer heads as we can currently get for
 377 * a particular job.
 378 */
 379static int run_io_job(struct kcopyd_job *job)
 380{
 381        int r;
 382        struct dm_io_request io_req = {
 383                .bi_rw = job->rw,
 384                .mem.type = DM_IO_PAGE_LIST,
 385                .mem.ptr.pl = job->pages,
 386                .mem.offset = 0,
 387                .notify.fn = complete_io,
 388                .notify.context = job,
 389                .client = job->kc->io_client,
 390        };
 391
 392        if (job->rw == READ)
 393                r = dm_io(&io_req, 1, &job->source, NULL);
 394        else
 395                r = dm_io(&io_req, job->num_dests, job->dests, NULL);
 396
 397        return r;
 398}
 399
 400static int run_pages_job(struct kcopyd_job *job)
 401{
 402        int r;
 403        unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
 404
 405        r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
 406        if (!r) {
 407                /* this job is ready for io */
 408                push(&job->kc->io_jobs, job);
 409                return 0;
 410        }
 411
 412        if (r == -ENOMEM)
 413                /* can't complete now */
 414                return 1;
 415
 416        return r;
 417}
 418
 419/*
 420 * Run through a list for as long as possible.  Returns the count
 421 * of successful jobs.
 422 */
 423static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
 424                        int (*fn) (struct kcopyd_job *))
 425{
 426        struct kcopyd_job *job;
 427        int r, count = 0;
 428
 429        while ((job = pop(jobs, kc))) {
 430
 431                r = fn(job);
 432
 433                if (r < 0) {
 434                        /* error this rogue job */
 435                        if (job->rw & WRITE)
 436                                job->write_err = (unsigned long) -1L;
 437                        else
 438                                job->read_err = 1;
 439                        push(&kc->complete_jobs, job);
 440                        break;
 441                }
 442
 443                if (r > 0) {
 444                        /*
 445                         * We couldn't service this job ATM, so
 446                         * push this job back onto the list.
 447                         */
 448                        push_head(jobs, job);
 449                        break;
 450                }
 451
 452                count++;
 453        }
 454
 455        return count;
 456}
 457
 458/*
 459 * kcopyd does this every time it's woken up.
 460 */
 461static void do_work(struct work_struct *work)
 462{
 463        struct dm_kcopyd_client *kc = container_of(work,
 464                                        struct dm_kcopyd_client, kcopyd_work);
 465        struct blk_plug plug;
 466
 467        /*
 468         * The order that these are called is *very* important.
 469         * complete jobs can free some pages for pages jobs.
 470         * Pages jobs when successful will jump onto the io jobs
 471         * list.  io jobs call wake when they complete and it all
 472         * starts again.
 473         */
 474        blk_start_plug(&plug);
 475        process_jobs(&kc->complete_jobs, kc, run_complete_job);
 476        process_jobs(&kc->pages_jobs, kc, run_pages_job);
 477        process_jobs(&kc->io_jobs, kc, run_io_job);
 478        blk_finish_plug(&plug);
 479}
 480
 481/*
 482 * If we are copying a small region we just dispatch a single job
 483 * to do the copy, otherwise the io has to be split up into many
 484 * jobs.
 485 */
 486static void dispatch_job(struct kcopyd_job *job)
 487{
 488        struct dm_kcopyd_client *kc = job->kc;
 489        atomic_inc(&kc->nr_jobs);
 490        if (unlikely(!job->source.count))
 491                push(&kc->complete_jobs, job);
 492        else if (job->pages == &zero_page_list)
 493                push(&kc->io_jobs, job);
 494        else
 495                push(&kc->pages_jobs, job);
 496        wake(kc);
 497}
 498
 499static void segment_complete(int read_err, unsigned long write_err,
 500                             void *context)
 501{
 502        /* FIXME: tidy this function */
 503        sector_t progress = 0;
 504        sector_t count = 0;
 505        struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
 506        struct kcopyd_job *job = sub_job->master_job;
 507        struct dm_kcopyd_client *kc = job->kc;
 508
 509        mutex_lock(&job->lock);
 510
 511        /* update the error */
 512        if (read_err)
 513                job->read_err = 1;
 514
 515        if (write_err)
 516                job->write_err |= write_err;
 517
 518        /*
 519         * Only dispatch more work if there hasn't been an error.
 520         */
 521        if ((!job->read_err && !job->write_err) ||
 522            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 523                /* get the next chunk of work */
 524                progress = job->progress;
 525                count = job->source.count - progress;
 526                if (count) {
 527                        if (count > SUB_JOB_SIZE)
 528                                count = SUB_JOB_SIZE;
 529
 530                        job->progress += count;
 531                }
 532        }
 533        mutex_unlock(&job->lock);
 534
 535        if (count) {
 536                int i;
 537
 538                *sub_job = *job;
 539                sub_job->source.sector += progress;
 540                sub_job->source.count = count;
 541
 542                for (i = 0; i < job->num_dests; i++) {
 543                        sub_job->dests[i].sector += progress;
 544                        sub_job->dests[i].count = count;
 545                }
 546
 547                sub_job->fn = segment_complete;
 548                sub_job->context = sub_job;
 549                dispatch_job(sub_job);
 550
 551        } else if (atomic_dec_and_test(&job->sub_jobs)) {
 552
 553                /*
 554                 * Queue the completion callback to the kcopyd thread.
 555                 *
 556                 * Some callers assume that all the completions are called
 557                 * from a single thread and don't race with each other.
 558                 *
 559                 * We must not call the callback directly here because this
 560                 * code may not be executing in the thread.
 561                 */
 562                push(&kc->complete_jobs, job);
 563                wake(kc);
 564        }
 565}
 566
 567/*
 568 * Create some sub jobs to share the work between them.
 569 */
 570static void split_job(struct kcopyd_job *master_job)
 571{
 572        int i;
 573
 574        atomic_inc(&master_job->kc->nr_jobs);
 575
 576        atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
 577        for (i = 0; i < SPLIT_COUNT; i++) {
 578                master_job[i + 1].master_job = master_job;
 579                segment_complete(0, 0u, &master_job[i + 1]);
 580        }
 581}
 582
 583int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
 584                   unsigned int num_dests, struct dm_io_region *dests,
 585                   unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
 586{
 587        struct kcopyd_job *job;
 588        int i;
 589
 590        /*
 591         * Allocate an array of jobs consisting of one master job
 592         * followed by SPLIT_COUNT sub jobs.
 593         */
 594        job = mempool_alloc(kc->job_pool, GFP_NOIO);
 595
 596        /*
 597         * set up for the read.
 598         */
 599        job->kc = kc;
 600        job->flags = flags;
 601        job->read_err = 0;
 602        job->write_err = 0;
 603
 604        job->num_dests = num_dests;
 605        memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 606
 607        if (from) {
 608                job->source = *from;
 609                job->pages = NULL;
 610                job->rw = READ;
 611        } else {
 612                memset(&job->source, 0, sizeof job->source);
 613                job->source.count = job->dests[0].count;
 614                job->pages = &zero_page_list;
 615
 616                /*
 617                 * Use WRITE SAME to optimize zeroing if all dests support it.
 618                 */
 619                job->rw = WRITE | REQ_WRITE_SAME;
 620                for (i = 0; i < job->num_dests; i++)
 621                        if (!bdev_write_same(job->dests[i].bdev)) {
 622                                job->rw = WRITE;
 623                                break;
 624                        }
 625        }
 626
 627        job->fn = fn;
 628        job->context = context;
 629        job->master_job = job;
 630
 631        if (job->source.count <= SUB_JOB_SIZE)
 632                dispatch_job(job);
 633        else {
 634                mutex_init(&job->lock);
 635                job->progress = 0;
 636                split_job(job);
 637        }
 638
 639        return 0;
 640}
 641EXPORT_SYMBOL(dm_kcopyd_copy);
 642
 643int dm_kcopyd_zero(struct dm_kcopyd_client *kc,
 644                   unsigned num_dests, struct dm_io_region *dests,
 645                   unsigned flags, dm_kcopyd_notify_fn fn, void *context)
 646{
 647        return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
 648}
 649EXPORT_SYMBOL(dm_kcopyd_zero);
 650
 651void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
 652                                 dm_kcopyd_notify_fn fn, void *context)
 653{
 654        struct kcopyd_job *job;
 655
 656        job = mempool_alloc(kc->job_pool, GFP_NOIO);
 657
 658        memset(job, 0, sizeof(struct kcopyd_job));
 659        job->kc = kc;
 660        job->fn = fn;
 661        job->context = context;
 662        job->master_job = job;
 663
 664        atomic_inc(&kc->nr_jobs);
 665
 666        return job;
 667}
 668EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
 669
 670void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
 671{
 672        struct kcopyd_job *job = j;
 673        struct dm_kcopyd_client *kc = job->kc;
 674
 675        job->read_err = read_err;
 676        job->write_err = write_err;
 677
 678        push(&kc->complete_jobs, job);
 679        wake(kc);
 680}
 681EXPORT_SYMBOL(dm_kcopyd_do_callback);
 682
 683/*
 684 * Cancels a kcopyd job, eg. someone might be deactivating a
 685 * mirror.
 686 */
 687#if 0
 688int kcopyd_cancel(struct kcopyd_job *job, int block)
 689{
 690        /* FIXME: finish */
 691        return -1;
 692}
 693#endif  /*  0  */
 694
 695/*-----------------------------------------------------------------
 696 * Client setup
 697 *---------------------------------------------------------------*/
 698struct dm_kcopyd_client *dm_kcopyd_client_create(void)
 699{
 700        int r = -ENOMEM;
 701        struct dm_kcopyd_client *kc;
 702
 703        kc = kmalloc(sizeof(*kc), GFP_KERNEL);
 704        if (!kc)
 705                return ERR_PTR(-ENOMEM);
 706
 707        spin_lock_init(&kc->job_lock);
 708        INIT_LIST_HEAD(&kc->complete_jobs);
 709        INIT_LIST_HEAD(&kc->io_jobs);
 710        INIT_LIST_HEAD(&kc->pages_jobs);
 711
 712        kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
 713        if (!kc->job_pool)
 714                goto bad_slab;
 715
 716        INIT_WORK(&kc->kcopyd_work, do_work);
 717        kc->kcopyd_wq = alloc_workqueue("kcopyd",
 718                                        WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
 719        if (!kc->kcopyd_wq)
 720                goto bad_workqueue;
 721
 722        kc->pages = NULL;
 723        kc->nr_reserved_pages = kc->nr_free_pages = 0;
 724        r = client_reserve_pages(kc, RESERVE_PAGES);
 725        if (r)
 726                goto bad_client_pages;
 727
 728        kc->io_client = dm_io_client_create();
 729        if (IS_ERR(kc->io_client)) {
 730                r = PTR_ERR(kc->io_client);
 731                goto bad_io_client;
 732        }
 733
 734        init_waitqueue_head(&kc->destroyq);
 735        atomic_set(&kc->nr_jobs, 0);
 736
 737        return kc;
 738
 739bad_io_client:
 740        client_free_pages(kc);
 741bad_client_pages:
 742        destroy_workqueue(kc->kcopyd_wq);
 743bad_workqueue:
 744        mempool_destroy(kc->job_pool);
 745bad_slab:
 746        kfree(kc);
 747
 748        return ERR_PTR(r);
 749}
 750EXPORT_SYMBOL(dm_kcopyd_client_create);
 751
 752void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
 753{
 754        /* Wait for completion of all jobs submitted by this client. */
 755        wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 756
 757        BUG_ON(!list_empty(&kc->complete_jobs));
 758        BUG_ON(!list_empty(&kc->io_jobs));
 759        BUG_ON(!list_empty(&kc->pages_jobs));
 760        destroy_workqueue(kc->kcopyd_wq);
 761        dm_io_client_destroy(kc->io_client);
 762        client_free_pages(kc);
 763        mempool_destroy(kc->job_pool);
 764        kfree(kc);
 765}
 766EXPORT_SYMBOL(dm_kcopyd_client_destroy);
 767
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.