linux/drivers/md/dm-kcopyd.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2002 Sistina Software (UK) Limited.
   3 * Copyright (C) 2006 Red Hat GmbH
   4 *
   5 * This file is released under the GPL.
   6 *
   7 * Kcopyd provides a simple interface for copying an area of one
   8 * block-device to one or more other block-devices, with an asynchronous
   9 * completion notification.
  10 */
  11
  12#include <linux/types.h>
  13#include <asm/atomic.h>
  14#include <linux/blkdev.h>
  15#include <linux/fs.h>
  16#include <linux/init.h>
  17#include <linux/list.h>
  18#include <linux/mempool.h>
  19#include <linux/module.h>
  20#include <linux/pagemap.h>
  21#include <linux/slab.h>
  22#include <linux/vmalloc.h>
  23#include <linux/workqueue.h>
  24#include <linux/mutex.h>
  25#include <linux/device-mapper.h>
  26#include <linux/dm-kcopyd.h>
  27
  28#include "dm.h"
  29
  30/*-----------------------------------------------------------------
  31 * Each kcopyd client has its own little pool of preallocated
  32 * pages for kcopyd io.
  33 *---------------------------------------------------------------*/
  34struct dm_kcopyd_client {
  35        spinlock_t lock;
  36        struct page_list *pages;
  37        unsigned int nr_pages;
  38        unsigned int nr_free_pages;
  39
  40        struct dm_io_client *io_client;
  41
  42        wait_queue_head_t destroyq;
  43        atomic_t nr_jobs;
  44
  45        mempool_t *job_pool;
  46
  47        struct workqueue_struct *kcopyd_wq;
  48        struct work_struct kcopyd_work;
  49
  50/*
  51 * We maintain three lists of jobs:
  52 *
  53 * i)   jobs waiting for pages
  54 * ii)  jobs that have pages, and are waiting for the io to be issued.
  55 * iii) jobs that have completed.
  56 *
  57 * All three of these are protected by job_lock.
  58 */
  59        spinlock_t job_lock;
  60        struct list_head complete_jobs;
  61        struct list_head io_jobs;
  62        struct list_head pages_jobs;
  63};
  64
  65static void wake(struct dm_kcopyd_client *kc)
  66{
  67        queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
  68}
  69
  70static struct page_list *alloc_pl(void)
  71{
  72        struct page_list *pl;
  73
  74        pl = kmalloc(sizeof(*pl), GFP_KERNEL);
  75        if (!pl)
  76                return NULL;
  77
  78        pl->page = alloc_page(GFP_KERNEL);
  79        if (!pl->page) {
  80                kfree(pl);
  81                return NULL;
  82        }
  83
  84        return pl;
  85}
  86
  87static void free_pl(struct page_list *pl)
  88{
  89        __free_page(pl->page);
  90        kfree(pl);
  91}
  92
  93static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
  94                            unsigned int nr, struct page_list **pages)
  95{
  96        struct page_list *pl;
  97
  98        spin_lock(&kc->lock);
  99        if (kc->nr_free_pages < nr) {
 100                spin_unlock(&kc->lock);
 101                return -ENOMEM;
 102        }
 103
 104        kc->nr_free_pages -= nr;
 105        for (*pages = pl = kc->pages; --nr; pl = pl->next)
 106                ;
 107
 108        kc->pages = pl->next;
 109        pl->next = NULL;
 110
 111        spin_unlock(&kc->lock);
 112
 113        return 0;
 114}
 115
 116static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
 117{
 118        struct page_list *cursor;
 119
 120        spin_lock(&kc->lock);
 121        for (cursor = pl; cursor->next; cursor = cursor->next)
 122                kc->nr_free_pages++;
 123
 124        kc->nr_free_pages++;
 125        cursor->next = kc->pages;
 126        kc->pages = pl;
 127        spin_unlock(&kc->lock);
 128}
 129
 130/*
 131 * These three functions resize the page pool.
 132 */
 133static void drop_pages(struct page_list *pl)
 134{
 135        struct page_list *next;
 136
 137        while (pl) {
 138                next = pl->next;
 139                free_pl(pl);
 140                pl = next;
 141        }
 142}
 143
 144static int client_alloc_pages(struct dm_kcopyd_client *kc, unsigned int nr)
 145{
 146        unsigned int i;
 147        struct page_list *pl = NULL, *next;
 148
 149        for (i = 0; i < nr; i++) {
 150                next = alloc_pl();
 151                if (!next) {
 152                        if (pl)
 153                                drop_pages(pl);
 154                        return -ENOMEM;
 155                }
 156                next->next = pl;
 157                pl = next;
 158        }
 159
 160        kcopyd_put_pages(kc, pl);
 161        kc->nr_pages += nr;
 162        return 0;
 163}
 164
 165static void client_free_pages(struct dm_kcopyd_client *kc)
 166{
 167        BUG_ON(kc->nr_free_pages != kc->nr_pages);
 168        drop_pages(kc->pages);
 169        kc->pages = NULL;
 170        kc->nr_free_pages = kc->nr_pages = 0;
 171}
 172
 173/*-----------------------------------------------------------------
 174 * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 175 * for this reason we use a mempool to prevent the client from
 176 * ever having to do io (which could cause a deadlock).
 177 *---------------------------------------------------------------*/
 178struct kcopyd_job {
 179        struct dm_kcopyd_client *kc;
 180        struct list_head list;
 181        unsigned long flags;
 182
 183        /*
 184         * Error state of the job.
 185         */
 186        int read_err;
 187        unsigned long write_err;
 188
 189        /*
 190         * Either READ or WRITE
 191         */
 192        int rw;
 193        struct dm_io_region source;
 194
 195        /*
 196         * The destinations for the transfer.
 197         */
 198        unsigned int num_dests;
 199        struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
 200
 201        sector_t offset;
 202        unsigned int nr_pages;
 203        struct page_list *pages;
 204
 205        /*
 206         * Set this to ensure you are notified when the job has
 207         * completed.  'context' is for callback to use.
 208         */
 209        dm_kcopyd_notify_fn fn;
 210        void *context;
 211
 212        /*
 213         * These fields are only used if the job has been split
 214         * into more manageable parts.
 215         */
 216        struct mutex lock;
 217        atomic_t sub_jobs;
 218        sector_t progress;
 219};
 220
 221/* FIXME: this should scale with the number of pages */
 222#define MIN_JOBS 512
 223
 224static struct kmem_cache *_job_cache;
 225
 226int __init dm_kcopyd_init(void)
 227{
 228        _job_cache = KMEM_CACHE(kcopyd_job, 0);
 229        if (!_job_cache)
 230                return -ENOMEM;
 231
 232        return 0;
 233}
 234
 235void dm_kcopyd_exit(void)
 236{
 237        kmem_cache_destroy(_job_cache);
 238        _job_cache = NULL;
 239}
 240
 241/*
 242 * Functions to push and pop a job onto the head of a given job
 243 * list.
 244 */
 245static struct kcopyd_job *pop(struct list_head *jobs,
 246                              struct dm_kcopyd_client *kc)
 247{
 248        struct kcopyd_job *job = NULL;
 249        unsigned long flags;
 250
 251        spin_lock_irqsave(&kc->job_lock, flags);
 252
 253        if (!list_empty(jobs)) {
 254                job = list_entry(jobs->next, struct kcopyd_job, list);
 255                list_del(&job->list);
 256        }
 257        spin_unlock_irqrestore(&kc->job_lock, flags);
 258
 259        return job;
 260}
 261
 262static void push(struct list_head *jobs, struct kcopyd_job *job)
 263{
 264        unsigned long flags;
 265        struct dm_kcopyd_client *kc = job->kc;
 266
 267        spin_lock_irqsave(&kc->job_lock, flags);
 268        list_add_tail(&job->list, jobs);
 269        spin_unlock_irqrestore(&kc->job_lock, flags);
 270}
 271
 272
 273static void push_head(struct list_head *jobs, struct kcopyd_job *job)
 274{
 275        unsigned long flags;
 276        struct dm_kcopyd_client *kc = job->kc;
 277
 278        spin_lock_irqsave(&kc->job_lock, flags);
 279        list_add(&job->list, jobs);
 280        spin_unlock_irqrestore(&kc->job_lock, flags);
 281}
 282
 283/*
 284 * These three functions process 1 item from the corresponding
 285 * job list.
 286 *
 287 * They return:
 288 * < 0: error
 289 *   0: success
 290 * > 0: can't process yet.
 291 */
 292static int run_complete_job(struct kcopyd_job *job)
 293{
 294        void *context = job->context;
 295        int read_err = job->read_err;
 296        unsigned long write_err = job->write_err;
 297        dm_kcopyd_notify_fn fn = job->fn;
 298        struct dm_kcopyd_client *kc = job->kc;
 299
 300        kcopyd_put_pages(kc, job->pages);
 301        mempool_free(job, kc->job_pool);
 302        fn(read_err, write_err, context);
 303
 304        if (atomic_dec_and_test(&kc->nr_jobs))
 305                wake_up(&kc->destroyq);
 306
 307        return 0;
 308}
 309
 310static void complete_io(unsigned long error, void *context)
 311{
 312        struct kcopyd_job *job = (struct kcopyd_job *) context;
 313        struct dm_kcopyd_client *kc = job->kc;
 314
 315        if (error) {
 316                if (job->rw == WRITE)
 317                        job->write_err |= error;
 318                else
 319                        job->read_err = 1;
 320
 321                if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 322                        push(&kc->complete_jobs, job);
 323                        wake(kc);
 324                        return;
 325                }
 326        }
 327
 328        if (job->rw == WRITE)
 329                push(&kc->complete_jobs, job);
 330
 331        else {
 332                job->rw = WRITE;
 333                push(&kc->io_jobs, job);
 334        }
 335
 336        wake(kc);
 337}
 338
 339/*
 340 * Request io on as many buffer heads as we can currently get for
 341 * a particular job.
 342 */
 343static int run_io_job(struct kcopyd_job *job)
 344{
 345        int r;
 346        struct dm_io_request io_req = {
 347                .bi_rw = job->rw | (1 << BIO_RW_SYNC),
 348                .mem.type = DM_IO_PAGE_LIST,
 349                .mem.ptr.pl = job->pages,
 350                .mem.offset = job->offset,
 351                .notify.fn = complete_io,
 352                .notify.context = job,
 353                .client = job->kc->io_client,
 354        };
 355
 356        if (job->rw == READ)
 357                r = dm_io(&io_req, 1, &job->source, NULL);
 358        else
 359                r = dm_io(&io_req, job->num_dests, job->dests, NULL);
 360
 361        return r;
 362}
 363
 364static int run_pages_job(struct kcopyd_job *job)
 365{
 366        int r;
 367
 368        job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
 369                                  PAGE_SIZE >> 9);
 370        r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
 371        if (!r) {
 372                /* this job is ready for io */
 373                push(&job->kc->io_jobs, job);
 374                return 0;
 375        }
 376
 377        if (r == -ENOMEM)
 378                /* can't complete now */
 379                return 1;
 380
 381        return r;
 382}
 383
 384/*
 385 * Run through a list for as long as possible.  Returns the count
 386 * of successful jobs.
 387 */
 388static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
 389                        int (*fn) (struct kcopyd_job *))
 390{
 391        struct kcopyd_job *job;
 392        int r, count = 0;
 393
 394        while ((job = pop(jobs, kc))) {
 395
 396                r = fn(job);
 397
 398                if (r < 0) {
 399                        /* error this rogue job */
 400                        if (job->rw == WRITE)
 401                                job->write_err = (unsigned long) -1L;
 402                        else
 403                                job->read_err = 1;
 404                        push(&kc->complete_jobs, job);
 405                        break;
 406                }
 407
 408                if (r > 0) {
 409                        /*
 410                         * We couldn't service this job ATM, so
 411                         * push this job back onto the list.
 412                         */
 413                        push_head(jobs, job);
 414                        break;
 415                }
 416
 417                count++;
 418        }
 419
 420        return count;
 421}
 422
 423/*
 424 * kcopyd does this every time it's woken up.
 425 */
 426static void do_work(struct work_struct *work)
 427{
 428        struct dm_kcopyd_client *kc = container_of(work,
 429                                        struct dm_kcopyd_client, kcopyd_work);
 430
 431        /*
 432         * The order that these are called is *very* important.
 433         * complete jobs can free some pages for pages jobs.
 434         * Pages jobs when successful will jump onto the io jobs
 435         * list.  io jobs call wake when they complete and it all
 436         * starts again.
 437         */
 438        process_jobs(&kc->complete_jobs, kc, run_complete_job);
 439        process_jobs(&kc->pages_jobs, kc, run_pages_job);
 440        process_jobs(&kc->io_jobs, kc, run_io_job);
 441}
 442
 443/*
 444 * If we are copying a small region we just dispatch a single job
 445 * to do the copy, otherwise the io has to be split up into many
 446 * jobs.
 447 */
 448static void dispatch_job(struct kcopyd_job *job)
 449{
 450        struct dm_kcopyd_client *kc = job->kc;
 451        atomic_inc(&kc->nr_jobs);
 452        push(&kc->pages_jobs, job);
 453        wake(kc);
 454}
 455
 456#define SUB_JOB_SIZE 128
 457static void segment_complete(int read_err, unsigned long write_err,
 458                             void *context)
 459{
 460        /* FIXME: tidy this function */
 461        sector_t progress = 0;
 462        sector_t count = 0;
 463        struct kcopyd_job *job = (struct kcopyd_job *) context;
 464
 465        mutex_lock(&job->lock);
 466
 467        /* update the error */
 468        if (read_err)
 469                job->read_err = 1;
 470
 471        if (write_err)
 472                job->write_err |= write_err;
 473
 474        /*
 475         * Only dispatch more work if there hasn't been an error.
 476         */
 477        if ((!job->read_err && !job->write_err) ||
 478            test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 479                /* get the next chunk of work */
 480                progress = job->progress;
 481                count = job->source.count - progress;
 482                if (count) {
 483                        if (count > SUB_JOB_SIZE)
 484                                count = SUB_JOB_SIZE;
 485
 486                        job->progress += count;
 487                }
 488        }
 489        mutex_unlock(&job->lock);
 490
 491        if (count) {
 492                int i;
 493                struct kcopyd_job *sub_job = mempool_alloc(job->kc->job_pool,
 494                                                           GFP_NOIO);
 495
 496                *sub_job = *job;
 497                sub_job->source.sector += progress;
 498                sub_job->source.count = count;
 499
 500                for (i = 0; i < job->num_dests; i++) {
 501                        sub_job->dests[i].sector += progress;
 502                        sub_job->dests[i].count = count;
 503                }
 504
 505                sub_job->fn = segment_complete;
 506                sub_job->context = job;
 507                dispatch_job(sub_job);
 508
 509        } else if (atomic_dec_and_test(&job->sub_jobs)) {
 510
 511                /*
 512                 * To avoid a race we must keep the job around
 513                 * until after the notify function has completed.
 514                 * Otherwise the client may try and stop the job
 515                 * after we've completed.
 516                 */
 517                job->fn(read_err, write_err, job->context);
 518                mempool_free(job, job->kc->job_pool);
 519        }
 520}
 521
 522/*
 523 * Create some little jobs that will do the move between
 524 * them.
 525 */
 526#define SPLIT_COUNT 8
 527static void split_job(struct kcopyd_job *job)
 528{
 529        int i;
 530
 531        atomic_set(&job->sub_jobs, SPLIT_COUNT);
 532        for (i = 0; i < SPLIT_COUNT; i++)
 533                segment_complete(0, 0u, job);
 534}
 535
 536int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
 537                   unsigned int num_dests, struct dm_io_region *dests,
 538                   unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
 539{
 540        struct kcopyd_job *job;
 541
 542        /*
 543         * Allocate a new job.
 544         */
 545        job = mempool_alloc(kc->job_pool, GFP_NOIO);
 546
 547        /*
 548         * set up for the read.
 549         */
 550        job->kc = kc;
 551        job->flags = flags;
 552        job->read_err = 0;
 553        job->write_err = 0;
 554        job->rw = READ;
 555
 556        job->source = *from;
 557
 558        job->num_dests = num_dests;
 559        memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 560
 561        job->offset = 0;
 562        job->nr_pages = 0;
 563        job->pages = NULL;
 564
 565        job->fn = fn;
 566        job->context = context;
 567
 568        if (job->source.count < SUB_JOB_SIZE)
 569                dispatch_job(job);
 570
 571        else {
 572                mutex_init(&job->lock);
 573                job->progress = 0;
 574                split_job(job);
 575        }
 576
 577        return 0;
 578}
 579EXPORT_SYMBOL(dm_kcopyd_copy);
 580
 581/*
 582 * Cancels a kcopyd job, eg. someone might be deactivating a
 583 * mirror.
 584 */
 585#if 0
 586int kcopyd_cancel(struct kcopyd_job *job, int block)
 587{
 588        /* FIXME: finish */
 589        return -1;
 590}
 591#endif  /*  0  */
 592
 593/*-----------------------------------------------------------------
 594 * Client setup
 595 *---------------------------------------------------------------*/
 596int dm_kcopyd_client_create(unsigned int nr_pages,
 597                            struct dm_kcopyd_client **result)
 598{
 599        int r = -ENOMEM;
 600        struct dm_kcopyd_client *kc;
 601
 602        kc = kmalloc(sizeof(*kc), GFP_KERNEL);
 603        if (!kc)
 604                return -ENOMEM;
 605
 606        spin_lock_init(&kc->lock);
 607        spin_lock_init(&kc->job_lock);
 608        INIT_LIST_HEAD(&kc->complete_jobs);
 609        INIT_LIST_HEAD(&kc->io_jobs);
 610        INIT_LIST_HEAD(&kc->pages_jobs);
 611
 612        kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
 613        if (!kc->job_pool)
 614                goto bad_slab;
 615
 616        INIT_WORK(&kc->kcopyd_work, do_work);
 617        kc->kcopyd_wq = create_singlethread_workqueue("kcopyd");
 618        if (!kc->kcopyd_wq)
 619                goto bad_workqueue;
 620
 621        kc->pages = NULL;
 622        kc->nr_pages = kc->nr_free_pages = 0;
 623        r = client_alloc_pages(kc, nr_pages);
 624        if (r)
 625                goto bad_client_pages;
 626
 627        kc->io_client = dm_io_client_create(nr_pages);
 628        if (IS_ERR(kc->io_client)) {
 629                r = PTR_ERR(kc->io_client);
 630                goto bad_io_client;
 631        }
 632
 633        init_waitqueue_head(&kc->destroyq);
 634        atomic_set(&kc->nr_jobs, 0);
 635
 636        *result = kc;
 637        return 0;
 638
 639bad_io_client:
 640        client_free_pages(kc);
 641bad_client_pages:
 642        destroy_workqueue(kc->kcopyd_wq);
 643bad_workqueue:
 644        mempool_destroy(kc->job_pool);
 645bad_slab:
 646        kfree(kc);
 647
 648        return r;
 649}
 650EXPORT_SYMBOL(dm_kcopyd_client_create);
 651
 652void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
 653{
 654        /* Wait for completion of all jobs submitted by this client. */
 655        wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 656
 657        BUG_ON(!list_empty(&kc->complete_jobs));
 658        BUG_ON(!list_empty(&kc->io_jobs));
 659        BUG_ON(!list_empty(&kc->pages_jobs));
 660        destroy_workqueue(kc->kcopyd_wq);
 661        dm_io_client_destroy(kc->io_client);
 662        client_free_pages(kc);
 663        mempool_destroy(kc->job_pool);
 664        kfree(kc);
 665}
 666EXPORT_SYMBOL(dm_kcopyd_client_destroy);
 667