linux-bk/fs/aio.c
<<
>>
Prefs
   1/*
   2 *      An async IO implementation for Linux
   3 *      Written by Benjamin LaHaise <bcrl@redhat.com>
   4 *
   5 *      Implements an efficient asynchronous io interface.
   6 *
   7 *      Copyright 2000, 2001, 2002 Red Hat, Inc.  All Rights Reserved.
   8 *
   9 *      See ../COPYING for licensing terms.
  10 */
  11#include <linux/kernel.h>
  12#include <linux/init.h>
  13#include <linux/errno.h>
  14#include <linux/time.h>
  15#include <linux/aio_abi.h>
  16#include <linux/module.h>
  17
  18//#define DEBUG 1
  19
  20#include <linux/sched.h>
  21#include <linux/fs.h>
  22#include <linux/file.h>
  23#include <linux/mm.h>
  24#include <linux/mman.h>
  25#include <linux/slab.h>
  26#include <linux/timer.h>
  27#include <linux/aio.h>
  28#include <linux/highmem.h>
  29#include <linux/workqueue.h>
  30
  31#include <asm/kmap_types.h>
  32#include <asm/uaccess.h>
  33#include <asm/mmu_context.h>
  34
  35#if DEBUG > 1
  36#define dprintk         printk
  37#else
  38#define dprintk(x...)   do { ; } while (0)
  39#endif
  40
  41/*------ sysctl variables----*/
  42atomic_t aio_nr = ATOMIC_INIT(0);       /* current system wide number of aio requests */
  43unsigned aio_max_nr = 0x10000;  /* system wide maximum number of aio requests */
  44/*----end sysctl variables---*/
  45
  46static kmem_cache_t     *kiocb_cachep;
  47static kmem_cache_t     *kioctx_cachep;
  48
  49static struct workqueue_struct *aio_wq;
  50
  51/* Used for rare fput completion. */
  52static void aio_fput_routine(void *);
  53static DECLARE_WORK(fput_work, aio_fput_routine, NULL);
  54
  55static spinlock_t       fput_lock = SPIN_LOCK_UNLOCKED;
  56LIST_HEAD(fput_head);
  57
  58static void aio_kick_handler(void *);
  59
  60/* aio_setup
  61 *      Creates the slab caches used by the aio routines, panic on
  62 *      failure as this is done early during the boot sequence.
  63 */
  64static int __init aio_setup(void)
  65{
  66        kiocb_cachep = kmem_cache_create("kiocb", sizeof(struct kiocb),
  67                                0, SLAB_HWCACHE_ALIGN, NULL, NULL);
  68        if (!kiocb_cachep)
  69                panic("unable to create kiocb cache\n");
  70
  71        kioctx_cachep = kmem_cache_create("kioctx", sizeof(struct kioctx),
  72                                0, SLAB_HWCACHE_ALIGN, NULL, NULL);
  73        if (!kioctx_cachep)
  74                panic("unable to create kioctx cache");
  75
  76        aio_wq = create_workqueue("aio");
  77
  78        pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
  79
  80        return 0;
  81}
  82
  83static void aio_free_ring(struct kioctx *ctx)
  84{
  85        struct aio_ring_info *info = &ctx->ring_info;
  86        long i;
  87
  88        for (i=0; i<info->nr_pages; i++)
  89                put_page(info->ring_pages[i]);
  90
  91        if (info->mmap_size) {
  92                down_write(&ctx->mm->mmap_sem);
  93                do_munmap(ctx->mm, info->mmap_base, info->mmap_size);
  94                up_write(&ctx->mm->mmap_sem);
  95        }
  96
  97        if (info->ring_pages && info->ring_pages != info->internal_pages)
  98                kfree(info->ring_pages);
  99        info->ring_pages = NULL;
 100        info->nr = 0;
 101}
 102
 103static int aio_setup_ring(struct kioctx *ctx)
 104{
 105        struct aio_ring *ring;
 106        struct aio_ring_info *info = &ctx->ring_info;
 107        unsigned nr_events = ctx->max_reqs;
 108        unsigned long size;
 109        int nr_pages;
 110
 111        /* Compensate for the ring buffer's head/tail overlap entry */
 112        nr_events += 2; /* 1 is required, 2 for good luck */
 113
 114        size = sizeof(struct aio_ring);
 115        size += sizeof(struct io_event) * nr_events;
 116        nr_pages = (size + PAGE_SIZE-1) >> PAGE_SHIFT;
 117
 118        if (nr_pages < 0)
 119                return -EINVAL;
 120
 121        info->nr_pages = nr_pages;
 122
 123        nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
 124
 125        info->nr = 0;
 126        info->ring_pages = info->internal_pages;
 127        if (nr_pages > AIO_RING_PAGES) {
 128                info->ring_pages = kmalloc(sizeof(struct page *) * nr_pages, GFP_KERNEL);
 129                if (!info->ring_pages)
 130                        return -ENOMEM;
 131                memset(info->ring_pages, 0, sizeof(struct page *) * nr_pages);
 132        }
 133
 134        info->mmap_size = nr_pages * PAGE_SIZE;
 135        dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
 136        down_write(&ctx->mm->mmap_sem);
 137        info->mmap_base = do_mmap(NULL, 0, info->mmap_size, 
 138                                  PROT_READ|PROT_WRITE, MAP_ANON|MAP_PRIVATE,
 139                                  0);
 140        if (IS_ERR((void *)info->mmap_base)) {
 141                up_write(&ctx->mm->mmap_sem);
 142                printk("mmap err: %ld\n", -info->mmap_base);
 143                info->mmap_size = 0;
 144                aio_free_ring(ctx);
 145                return -EAGAIN;
 146        }
 147
 148        dprintk("mmap address: 0x%08lx\n", info->mmap_base);
 149        info->nr_pages = get_user_pages(current, ctx->mm,
 150                                        info->mmap_base, nr_pages, 
 151                                        1, 0, info->ring_pages, NULL);
 152        up_write(&ctx->mm->mmap_sem);
 153
 154        if (unlikely(info->nr_pages != nr_pages)) {
 155                aio_free_ring(ctx);
 156                return -EAGAIN;
 157        }
 158
 159        ctx->user_id = info->mmap_base;
 160
 161        info->nr = nr_events;           /* trusted copy */
 162
 163        ring = kmap_atomic(info->ring_pages[0], KM_USER0);
 164        ring->nr = nr_events;   /* user copy */
 165        ring->id = ctx->user_id;
 166        ring->head = ring->tail = 0;
 167        ring->magic = AIO_RING_MAGIC;
 168        ring->compat_features = AIO_RING_COMPAT_FEATURES;
 169        ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
 170        ring->header_length = sizeof(struct aio_ring);
 171        kunmap_atomic(ring, KM_USER0);
 172
 173        return 0;
 174}
 175
 176
 177/* aio_ring_event: returns a pointer to the event at the given index from
 178 * kmap_atomic(, km).  Release the pointer with put_aio_ring_event();
 179 */
 180#define AIO_EVENTS_PER_PAGE     (PAGE_SIZE / sizeof(struct io_event))
 181#define AIO_EVENTS_FIRST_PAGE   ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
 182#define AIO_EVENTS_OFFSET       (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
 183
 184#define aio_ring_event(info, nr, km) ({                                 \
 185        unsigned pos = (nr) + AIO_EVENTS_OFFSET;                        \
 186        struct io_event *__event;                                       \
 187        __event = kmap_atomic(                                          \
 188                        (info)->ring_pages[pos / AIO_EVENTS_PER_PAGE], km); \
 189        __event += pos % AIO_EVENTS_PER_PAGE;                           \
 190        __event;                                                        \
 191})
 192
 193#define put_aio_ring_event(event, km) do {      \
 194        struct io_event *__event = (event);     \
 195        (void)__event;                          \
 196        kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK), km); \
 197} while(0)
 198
 199/* ioctx_alloc
 200 *      Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
 201 */
 202static struct kioctx *ioctx_alloc(unsigned nr_events)
 203{
 204        struct mm_struct *mm;
 205        struct kioctx *ctx;
 206
 207        /* Prevent overflows */
 208        if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
 209            (nr_events > (0x10000000U / sizeof(struct kiocb)))) {
 210                pr_debug("ENOMEM: nr_events too high\n");
 211                return ERR_PTR(-EINVAL);
 212        }
 213
 214        if (nr_events > aio_max_nr)
 215                return ERR_PTR(-EAGAIN);
 216
 217        ctx = kmem_cache_alloc(kioctx_cachep, GFP_KERNEL);
 218        if (!ctx)
 219                return ERR_PTR(-ENOMEM);
 220
 221        memset(ctx, 0, sizeof(*ctx));
 222        ctx->max_reqs = nr_events;
 223        mm = ctx->mm = current->mm;
 224        atomic_inc(&mm->mm_count);
 225
 226        atomic_set(&ctx->users, 1);
 227        spin_lock_init(&ctx->ctx_lock);
 228        spin_lock_init(&ctx->ring_info.ring_lock);
 229        init_waitqueue_head(&ctx->wait);
 230
 231        INIT_LIST_HEAD(&ctx->active_reqs);
 232        INIT_LIST_HEAD(&ctx->run_list);
 233        INIT_WORK(&ctx->wq, aio_kick_handler, ctx);
 234
 235        if (aio_setup_ring(ctx) < 0)
 236                goto out_freectx;
 237
 238        /* limit the number of system wide aios */
 239        atomic_add(ctx->max_reqs, &aio_nr);     /* undone by __put_ioctx */
 240        if (unlikely(atomic_read(&aio_nr) > aio_max_nr))
 241                goto out_cleanup;
 242
 243        /* now link into global list.  kludge.  FIXME */
 244        write_lock(&mm->ioctx_list_lock);
 245        ctx->next = mm->ioctx_list;
 246        mm->ioctx_list = ctx;
 247        write_unlock(&mm->ioctx_list_lock);
 248
 249        dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
 250                ctx, ctx->user_id, current->mm, ctx->ring_info.nr);
 251        return ctx;
 252
 253out_cleanup:
 254        atomic_sub(ctx->max_reqs, &aio_nr);
 255        ctx->max_reqs = 0;      /* prevent __put_ioctx from sub'ing aio_nr */
 256        __put_ioctx(ctx);
 257        return ERR_PTR(-EAGAIN);
 258
 259out_freectx:
 260        mmdrop(mm);
 261        kmem_cache_free(kioctx_cachep, ctx);
 262        ctx = ERR_PTR(-ENOMEM);
 263
 264        dprintk("aio: error allocating ioctx %p\n", ctx);
 265        return ctx;
 266}
 267
 268/* aio_cancel_all
 269 *      Cancels all outstanding aio requests on an aio context.  Used 
 270 *      when the processes owning a context have all exited to encourage 
 271 *      the rapid destruction of the kioctx.
 272 */
 273static void aio_cancel_all(struct kioctx *ctx)
 274{
 275        int (*cancel)(struct kiocb *, struct io_event *);
 276        struct io_event res;
 277        spin_lock_irq(&ctx->ctx_lock);
 278        ctx->dead = 1;
 279        while (!list_empty(&ctx->active_reqs)) {
 280                struct list_head *pos = ctx->active_reqs.next;
 281                struct kiocb *iocb = list_kiocb(pos);
 282                list_del_init(&iocb->ki_list);
 283                cancel = iocb->ki_cancel;
 284                if (cancel) {
 285                        iocb->ki_users++;
 286                        spin_unlock_irq(&ctx->ctx_lock);
 287                        cancel(iocb, &res);
 288                        spin_lock_irq(&ctx->ctx_lock);
 289                }
 290        }
 291        spin_unlock_irq(&ctx->ctx_lock);
 292}
 293
 294void wait_for_all_aios(struct kioctx *ctx)
 295{
 296        struct task_struct *tsk = current;
 297        DECLARE_WAITQUEUE(wait, tsk);
 298
 299        if (!ctx->reqs_active)
 300                return;
 301
 302        add_wait_queue(&ctx->wait, &wait);
 303        set_task_state(tsk, TASK_UNINTERRUPTIBLE);
 304        while (ctx->reqs_active) {
 305                schedule();
 306                set_task_state(tsk, TASK_UNINTERRUPTIBLE);
 307        }
 308        __set_task_state(tsk, TASK_RUNNING);
 309        remove_wait_queue(&ctx->wait, &wait);
 310}
 311
 312/* wait_on_sync_kiocb:
 313 *      Waits on the given sync kiocb to complete.
 314 */
 315ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
 316{
 317        while (iocb->ki_users) {
 318                set_current_state(TASK_UNINTERRUPTIBLE);
 319                if (!iocb->ki_users)
 320                        break;
 321                schedule();
 322        }
 323        __set_current_state(TASK_RUNNING);
 324        return iocb->ki_user_data;
 325}
 326
 327/* exit_aio: called when the last user of mm goes away.  At this point, 
 328 * there is no way for any new requests to be submited or any of the 
 329 * io_* syscalls to be called on the context.  However, there may be 
 330 * outstanding requests which hold references to the context; as they 
 331 * go away, they will call put_ioctx and release any pinned memory
 332 * associated with the request (held via struct page * references).
 333 */
 334void exit_aio(struct mm_struct *mm)
 335{
 336        struct kioctx *ctx = mm->ioctx_list;
 337        mm->ioctx_list = NULL;
 338        while (ctx) {
 339                struct kioctx *next = ctx->next;
 340                ctx->next = NULL;
 341                aio_cancel_all(ctx);
 342
 343                wait_for_all_aios(ctx);
 344
 345                if (1 != atomic_read(&ctx->users))
 346                        printk(KERN_DEBUG
 347                                "exit_aio:ioctx still alive: %d %d %d\n",
 348                                atomic_read(&ctx->users), ctx->dead,
 349                                ctx->reqs_active);
 350                put_ioctx(ctx);
 351                ctx = next;
 352        }
 353}
 354
 355/* __put_ioctx
 356 *      Called when the last user of an aio context has gone away,
 357 *      and the struct needs to be freed.
 358 */
 359void __put_ioctx(struct kioctx *ctx)
 360{
 361        unsigned nr_events = ctx->max_reqs;
 362
 363        if (unlikely(ctx->reqs_active))
 364                BUG();
 365
 366        aio_free_ring(ctx);
 367        mmdrop(ctx->mm);
 368        ctx->mm = NULL;
 369        pr_debug("__put_ioctx: freeing %p\n", ctx);
 370        kmem_cache_free(kioctx_cachep, ctx);
 371
 372        atomic_sub(nr_events, &aio_nr);
 373}
 374
 375/* aio_get_req
 376 *      Allocate a slot for an aio request.  Increments the users count
 377 * of the kioctx so that the kioctx stays around until all requests are
 378 * complete.  Returns NULL if no requests are free.
 379 *
 380 * Returns with kiocb->users set to 2.  The io submit code path holds
 381 * an extra reference while submitting the i/o.
 382 * This prevents races between the aio code path referencing the
 383 * req (after submitting it) and aio_complete() freeing the req.
 384 */
 385static struct kiocb *FASTCALL(__aio_get_req(struct kioctx *ctx));
 386static struct kiocb *__aio_get_req(struct kioctx *ctx)
 387{
 388        struct kiocb *req = NULL;
 389        struct aio_ring *ring;
 390        int okay = 0;
 391
 392        req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
 393        if (unlikely(!req))
 394                return NULL;
 395
 396        req->ki_flags = 1 << KIF_LOCKED;
 397        req->ki_users = 2;
 398        req->ki_key = 0;
 399        req->ki_ctx = ctx;
 400        req->ki_cancel = NULL;
 401        req->ki_retry = NULL;
 402        req->ki_user_obj = NULL;
 403
 404        /* Check if the completion queue has enough free space to
 405         * accept an event from this io.
 406         */
 407        spin_lock_irq(&ctx->ctx_lock);
 408        ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0);
 409        if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) {
 410                list_add(&req->ki_list, &ctx->active_reqs);
 411                get_ioctx(ctx);
 412                ctx->reqs_active++;
 413                okay = 1;
 414        }
 415        kunmap_atomic(ring, KM_USER0);
 416        spin_unlock_irq(&ctx->ctx_lock);
 417
 418        if (!okay) {
 419                kmem_cache_free(kiocb_cachep, req);
 420                req = NULL;
 421        }
 422
 423        return req;
 424}
 425
 426static inline struct kiocb *aio_get_req(struct kioctx *ctx)
 427{
 428        struct kiocb *req;
 429        /* Handle a potential starvation case -- should be exceedingly rare as 
 430         * requests will be stuck on fput_head only if the aio_fput_routine is 
 431         * delayed and the requests were the last user of the struct file.
 432         */
 433        req = __aio_get_req(ctx);
 434        if (unlikely(NULL == req)) {
 435                aio_fput_routine(NULL);
 436                req = __aio_get_req(ctx);
 437        }
 438        return req;
 439}
 440
 441static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
 442{
 443        req->ki_ctx = NULL;
 444        req->ki_filp = NULL;
 445        req->ki_user_obj = NULL;
 446        kmem_cache_free(kiocb_cachep, req);
 447        ctx->reqs_active--;
 448
 449        if (unlikely(!ctx->reqs_active && ctx->dead))
 450                wake_up(&ctx->wait);
 451}
 452
 453static void aio_fput_routine(void *data)
 454{
 455        spin_lock_irq(&fput_lock);
 456        while (likely(!list_empty(&fput_head))) {
 457                struct kiocb *req = list_kiocb(fput_head.next);
 458                struct kioctx *ctx = req->ki_ctx;
 459
 460                list_del(&req->ki_list);
 461                spin_unlock_irq(&fput_lock);
 462
 463                /* Complete the fput */
 464                __fput(req->ki_filp);
 465
 466                /* Link the iocb into the context's free list */
 467                spin_lock_irq(&ctx->ctx_lock);
 468                really_put_req(ctx, req);
 469                spin_unlock_irq(&ctx->ctx_lock);
 470
 471                put_ioctx(ctx);
 472                spin_lock_irq(&fput_lock);
 473        }
 474        spin_unlock_irq(&fput_lock);
 475}
 476
 477/* __aio_put_req
 478 *      Returns true if this put was the last user of the request.
 479 */
 480static int __aio_put_req(struct kioctx *ctx, struct kiocb *req)
 481{
 482        dprintk(KERN_DEBUG "aio_put(%p): f_count=%d\n",
 483                req, atomic_read(&req->ki_filp->f_count));
 484
 485        req->ki_users --;
 486        if (unlikely(req->ki_users < 0))
 487                BUG();
 488        if (likely(req->ki_users))
 489                return 0;
 490        list_del(&req->ki_list);                /* remove from active_reqs */
 491        req->ki_cancel = NULL;
 492        req->ki_retry = NULL;
 493
 494        /* Must be done under the lock to serialise against cancellation.
 495         * Call this aio_fput as it duplicates fput via the fput_work.
 496         */
 497        if (unlikely(atomic_dec_and_test(&req->ki_filp->f_count))) {
 498                get_ioctx(ctx);
 499                spin_lock(&fput_lock);
 500                list_add(&req->ki_list, &fput_head);
 501                spin_unlock(&fput_lock);
 502                queue_work(aio_wq, &fput_work);
 503        } else
 504                really_put_req(ctx, req);
 505        return 1;
 506}
 507
 508/* aio_put_req
 509 *      Returns true if this put was the last user of the kiocb,
 510 *      false if the request is still in use.
 511 */
 512int aio_put_req(struct kiocb *req)
 513{
 514        struct kioctx *ctx = req->ki_ctx;
 515        int ret;
 516        spin_lock_irq(&ctx->ctx_lock);
 517        ret = __aio_put_req(ctx, req);
 518        spin_unlock_irq(&ctx->ctx_lock);
 519        if (ret)
 520                put_ioctx(ctx);
 521        return ret;
 522}
 523
 524/*      Lookup an ioctx id.  ioctx_list is lockless for reads.
 525 *      FIXME: this is O(n) and is only suitable for development.
 526 */
 527struct kioctx *lookup_ioctx(unsigned long ctx_id)
 528{
 529        struct kioctx *ioctx;
 530        struct mm_struct *mm;
 531
 532        mm = current->mm;
 533        read_lock(&mm->ioctx_list_lock);
 534        for (ioctx = mm->ioctx_list; ioctx; ioctx = ioctx->next)
 535                if (likely(ioctx->user_id == ctx_id && !ioctx->dead)) {
 536                        get_ioctx(ioctx);
 537                        break;
 538                }
 539        read_unlock(&mm->ioctx_list_lock);
 540
 541        return ioctx;
 542}
 543
 544static void use_mm(struct mm_struct *mm)
 545{
 546        struct mm_struct *active_mm = current->active_mm;
 547        atomic_inc(&mm->mm_count);
 548        current->mm = mm;
 549        if (mm != active_mm) {
 550                current->active_mm = mm;
 551                activate_mm(active_mm, mm);
 552        }
 553        mmdrop(active_mm);
 554}
 555
 556static void unuse_mm(struct mm_struct *mm)
 557{
 558        current->mm = NULL;
 559        /* active_mm is still 'mm' */
 560        enter_lazy_tlb(mm, current);
 561}
 562
 563/* Run on kevent's context.  FIXME: needs to be per-cpu and warn if an
 564 * operation blocks.
 565 */
 566static void aio_kick_handler(void *data)
 567{
 568        struct kioctx *ctx = data;
 569
 570        use_mm(ctx->mm);
 571
 572        spin_lock_irq(&ctx->ctx_lock);
 573        while (!list_empty(&ctx->run_list)) {
 574                struct kiocb *iocb;
 575                long ret;
 576
 577                iocb = list_entry(ctx->run_list.next, struct kiocb,
 578                                  ki_run_list);
 579                list_del(&iocb->ki_run_list);
 580                iocb->ki_users ++;
 581                spin_unlock_irq(&ctx->ctx_lock);
 582
 583                kiocbClearKicked(iocb);
 584                ret = iocb->ki_retry(iocb);
 585                if (-EIOCBQUEUED != ret) {
 586                        aio_complete(iocb, ret, 0);
 587                        iocb = NULL;
 588                }
 589
 590                spin_lock_irq(&ctx->ctx_lock);
 591                if (NULL != iocb)
 592                        __aio_put_req(ctx, iocb);
 593        }
 594        spin_unlock_irq(&ctx->ctx_lock);
 595
 596        unuse_mm(ctx->mm);
 597}
 598
 599void kick_iocb(struct kiocb *iocb)
 600{
 601        struct kioctx   *ctx = iocb->ki_ctx;
 602
 603        /* sync iocbs are easy: they can only ever be executing from a 
 604         * single context. */
 605        if (is_sync_kiocb(iocb)) {
 606                kiocbSetKicked(iocb);
 607                wake_up_process(iocb->ki_user_obj);
 608                return;
 609        }
 610
 611        if (!kiocbTryKick(iocb)) {
 612                unsigned long flags;
 613                spin_lock_irqsave(&ctx->ctx_lock, flags);
 614                list_add_tail(&iocb->ki_run_list, &ctx->run_list);
 615                spin_unlock_irqrestore(&ctx->ctx_lock, flags);
 616                schedule_work(&ctx->wq);
 617        }
 618}
 619
 620/* aio_complete
 621 *      Called when the io request on the given iocb is complete.
 622 *      Returns true if this is the last user of the request.  The 
 623 *      only other user of the request can be the cancellation code.
 624 */
 625int aio_complete(struct kiocb *iocb, long res, long res2)
 626{
 627        struct kioctx   *ctx = iocb->ki_ctx;
 628        struct aio_ring_info    *info;
 629        struct aio_ring *ring;
 630        struct io_event *event;
 631        unsigned long   flags;
 632        unsigned long   tail;
 633        int             ret;
 634
 635        /* Special case handling for sync iocbs: events go directly
 636         * into the iocb for fast handling.  Note that this will not 
 637         * work if we allow sync kiocbs to be cancelled. in which
 638         * case the usage count checks will have to move under ctx_lock
 639         * for all cases.
 640         */
 641        if (is_sync_kiocb(iocb)) {
 642                int ret;
 643
 644                iocb->ki_user_data = res;
 645                if (iocb->ki_users == 1) {
 646                        iocb->ki_users = 0;
 647                        ret = 1;
 648                } else {
 649                        spin_lock_irq(&ctx->ctx_lock);
 650                        iocb->ki_users--;
 651                        ret = (0 == iocb->ki_users);
 652                        spin_unlock_irq(&ctx->ctx_lock);
 653                }
 654                /* sync iocbs put the task here for us */
 655                wake_up_process(iocb->ki_user_obj);
 656                return ret;
 657        }
 658
 659        info = &ctx->ring_info;
 660
 661        /* add a completion event to the ring buffer.
 662         * must be done holding ctx->ctx_lock to prevent
 663         * other code from messing with the tail
 664         * pointer since we might be called from irq
 665         * context.
 666         */
 667        spin_lock_irqsave(&ctx->ctx_lock, flags);
 668
 669        ring = kmap_atomic(info->ring_pages[0], KM_IRQ1);
 670
 671        tail = info->tail;
 672        event = aio_ring_event(info, tail, KM_IRQ0);
 673        tail = (tail + 1) % info->nr;
 674
 675        event->obj = (u64)(unsigned long)iocb->ki_user_obj;
 676        event->data = iocb->ki_user_data;
 677        event->res = res;
 678        event->res2 = res2;
 679
 680        dprintk("aio_complete: %p[%lu]: %p: %p %Lx %lx %lx\n",
 681                ctx, tail, iocb, iocb->ki_user_obj, iocb->ki_user_data,
 682                res, res2);
 683
 684        /* after flagging the request as done, we
 685         * must never even look at it again
 686         */
 687        smp_wmb();      /* make event visible before updating tail */
 688
 689        info->tail = tail;
 690        ring->tail = tail;
 691
 692        put_aio_ring_event(event, KM_IRQ0);
 693        kunmap_atomic(ring, KM_IRQ1);
 694
 695        pr_debug("added to ring %p at [%lu]\n", iocb, tail);
 696
 697        /* everything turned out well, dispose of the aiocb. */
 698        ret = __aio_put_req(ctx, iocb);
 699
 700        spin_unlock_irqrestore(&ctx->ctx_lock, flags);
 701
 702        if (waitqueue_active(&ctx->wait))
 703                wake_up(&ctx->wait);
 704
 705        if (ret)
 706                put_ioctx(ctx);
 707
 708        return ret;
 709}
 710
 711/* aio_read_evt
 712 *      Pull an event off of the ioctx's event ring.  Returns the number of 
 713 *      events fetched (0 or 1 ;-)
 714 *      FIXME: make this use cmpxchg.
 715 *      TODO: make the ringbuffer user mmap()able (requires FIXME).
 716 */
 717static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
 718{
 719        struct aio_ring_info *info = &ioctx->ring_info;
 720        struct aio_ring *ring;
 721        unsigned long head;
 722        int ret = 0;
 723
 724        ring = kmap_atomic(info->ring_pages[0], KM_USER0);
 725        dprintk("in aio_read_evt h%lu t%lu m%lu\n",
 726                 (unsigned long)ring->head, (unsigned long)ring->tail,
 727                 (unsigned long)ring->nr);
 728
 729        if (ring->head == ring->tail)
 730                goto out;
 731
 732        spin_lock(&info->ring_lock);
 733
 734        head = ring->head % info->nr;
 735        if (head != ring->tail) {
 736                struct io_event *evp = aio_ring_event(info, head, KM_USER1);
 737                *ent = *evp;
 738                head = (head + 1) % info->nr;
 739                smp_mb(); /* finish reading the event before updatng the head */
 740                ring->head = head;
 741                ret = 1;
 742                put_aio_ring_event(evp, KM_USER1);
 743        }
 744        spin_unlock(&info->ring_lock);
 745
 746out:
 747        kunmap_atomic(ring, KM_USER0);
 748        dprintk("leaving aio_read_evt: %d  h%lu t%lu\n", ret,
 749                 (unsigned long)ring->head, (unsigned long)ring->tail);
 750        return ret;
 751}
 752
 753struct timeout {
 754        struct timer_list       timer;
 755        int                     timed_out;
 756        struct task_struct      *p;
 757};
 758
 759static void timeout_func(unsigned long data)
 760{
 761        struct timeout *to = (struct timeout *)data;
 762
 763        to->timed_out = 1;
 764        wake_up_process(to->p);
 765}
 766
 767static inline void init_timeout(struct timeout *to)
 768{
 769        init_timer(&to->timer);
 770        to->timer.data = (unsigned long)to;
 771        to->timer.function = timeout_func;
 772        to->timed_out = 0;
 773        to->p = current;
 774}
 775
 776static inline void set_timeout(long start_jiffies, struct timeout *to,
 777                               const struct timespec *ts)
 778{
 779        unsigned long how_long;
 780
 781        if (ts->tv_sec < 0 || (!ts->tv_sec && !ts->tv_nsec)) {
 782                to->timed_out = 1;
 783                return;
 784        }
 785
 786        how_long = ts->tv_sec * HZ;
 787#define HZ_NS (1000000000 / HZ)
 788        how_long += (ts->tv_nsec + HZ_NS - 1) / HZ_NS;
 789        
 790        to->timer.expires = jiffies + how_long;
 791        add_timer(&to->timer);
 792}
 793
 794static inline void clear_timeout(struct timeout *to)
 795{
 796        del_timer_sync(&to->timer);
 797}
 798
 799static int read_events(struct kioctx *ctx,
 800                        long min_nr, long nr,
 801                        struct io_event *event,
 802                        struct timespec *timeout)
 803{
 804        long                    start_jiffies = jiffies;
 805        struct task_struct      *tsk = current;
 806        DECLARE_WAITQUEUE(wait, tsk);
 807        int                     ret;
 808        int                     i = 0;
 809        struct io_event         ent;
 810        struct timeout          to;
 811
 812        /* needed to zero any padding within an entry (there shouldn't be 
 813         * any, but C is fun!
 814         */
 815        memset(&ent, 0, sizeof(ent));
 816        ret = 0;
 817
 818        while (likely(i < nr)) {
 819                ret = aio_read_evt(ctx, &ent);
 820                if (unlikely(ret <= 0))
 821                        break;
 822
 823                dprintk("read event: %Lx %Lx %Lx %Lx\n",
 824                        ent.data, ent.obj, ent.res, ent.res2);
 825
 826                /* Could we split the check in two? */
 827                ret = -EFAULT;
 828                if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
 829                        dprintk("aio: lost an event due to EFAULT.\n");
 830                        break;
 831                }
 832                ret = 0;
 833
 834                /* Good, event copied to userland, update counts. */
 835                event ++;
 836                i ++;
 837        }
 838
 839        if (min_nr <= i)
 840                return i;
 841        if (ret)
 842                return ret;
 843
 844        /* End fast path */
 845
 846        init_timeout(&to);
 847        if (timeout) {
 848                struct timespec ts;
 849                ret = -EFAULT;
 850                if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
 851                        goto out;
 852
 853                set_timeout(start_jiffies, &to, &ts);
 854        }
 855
 856        while (likely(i < nr)) {
 857                add_wait_queue_exclusive(&ctx->wait, &wait);
 858                do {
 859                        set_task_state(tsk, TASK_INTERRUPTIBLE);
 860
 861                        ret = aio_read_evt(ctx, &ent);
 862                        if (ret)
 863                                break;
 864                        if (min_nr <= i)
 865                                break;
 866                        ret = 0;
 867                        if (to.timed_out)       /* Only check after read evt */
 868                                break;
 869                        schedule();
 870                        if (signal_pending(tsk)) {
 871                                ret = -EINTR;
 872                                break;
 873                        }
 874                        /*ret = aio_read_evt(ctx, &ent);*/
 875                } while (1) ;
 876
 877                set_task_state(tsk, TASK_RUNNING);
 878                remove_wait_queue(&ctx->wait, &wait);
 879
 880                if (unlikely(ret <= 0))
 881                        break;
 882
 883                ret = -EFAULT;
 884                if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
 885                        dprintk("aio: lost an event due to EFAULT.\n");
 886                        break;
 887                }
 888
 889                /* Good, event copied to userland, update counts. */
 890                event ++;
 891                i ++;
 892        }
 893
 894        if (timeout)
 895                clear_timeout(&to);
 896out:
 897        return i ? i : ret;
 898}
 899
 900/* Take an ioctx and remove it from the list of ioctx's.  Protects 
 901 * against races with itself via ->dead.
 902 */
 903static void io_destroy(struct kioctx *ioctx)
 904{
 905        struct mm_struct *mm = current->mm;
 906        struct kioctx **tmp;
 907        int was_dead;
 908
 909        /* delete the entry from the list is someone else hasn't already */
 910        write_lock(&mm->ioctx_list_lock);
 911        was_dead = ioctx->dead;
 912        ioctx->dead = 1;
 913        for (tmp = &mm->ioctx_list; *tmp && *tmp != ioctx;
 914             tmp = &(*tmp)->next)
 915                ;
 916        if (*tmp)
 917                *tmp = ioctx->next;
 918        write_unlock(&mm->ioctx_list_lock);
 919
 920        dprintk("aio_release(%p)\n", ioctx);
 921        if (likely(!was_dead))
 922                put_ioctx(ioctx);       /* twice for the list */
 923
 924        aio_cancel_all(ioctx);
 925        wait_for_all_aios(ioctx);
 926        put_ioctx(ioctx);       /* once for the lookup */
 927}
 928
 929/* sys_io_setup:
 930 *      Create an aio_context capable of receiving at least nr_events.
 931 *      ctxp must not point to an aio_context that already exists, and
 932 *      must be initialized to 0 prior to the call.  On successful
 933 *      creation of the aio_context, *ctxp is filled in with the resulting 
 934 *      handle.  May fail with -EINVAL if *ctxp is not initialized,
 935 *      if the specified nr_events exceeds internal limits.  May fail 
 936 *      with -EAGAIN if the specified nr_events exceeds the user's limit 
 937 *      of available events.  May fail with -ENOMEM if insufficient kernel
 938 *      resources are available.  May fail with -EFAULT if an invalid
 939 *      pointer is passed for ctxp.  Will fail with -ENOSYS if not
 940 *      implemented.
 941 */
 942asmlinkage long sys_io_setup(unsigned nr_events, aio_context_t *ctxp)
 943{
 944        struct kioctx *ioctx = NULL;
 945        unsigned long ctx;
 946        long ret;
 947
 948        ret = get_user(ctx, ctxp);
 949        if (unlikely(ret))
 950                goto out;
 951
 952        ret = -EINVAL;
 953        if (unlikely(ctx || (int)nr_events <= 0)) {
 954                pr_debug("EINVAL: io_setup: ctx or nr_events > max\n");
 955                goto out;
 956        }
 957
 958        ioctx = ioctx_alloc(nr_events);
 959        ret = PTR_ERR(ioctx);
 960        if (!IS_ERR(ioctx)) {
 961                ret = put_user(ioctx->user_id, ctxp);
 962                if (!ret)
 963                        return 0;
 964                io_destroy(ioctx);
 965        }
 966
 967out:
 968        return ret;
 969}
 970
 971/* sys_io_destroy:
 972 *      Destroy the aio_context specified.  May cancel any outstanding 
 973 *      AIOs and block on completion.  Will fail with -ENOSYS if not
 974 *      implemented.  May fail with -EFAULT if the context pointed to
 975 *      is invalid.
 976 */
 977asmlinkage long sys_io_destroy(aio_context_t ctx)
 978{
 979        struct kioctx *ioctx = lookup_ioctx(ctx);
 980        if (likely(NULL != ioctx)) {
 981                io_destroy(ioctx);
 982                return 0;
 983        }
 984        pr_debug("EINVAL: io_destroy: invalid context id\n");
 985        return -EINVAL;
 986}
 987
 988int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 989                         struct iocb *iocb)
 990{
 991        struct kiocb *req;
 992        struct file *file;
 993        ssize_t ret;
 994        char *buf;
 995
 996        /* enforce forwards compatibility on users */
 997        if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 ||
 998                     iocb->aio_reserved3)) {
 999                pr_debug("EINVAL: io_submit: reserve field set\n");
1000                return -EINVAL;
1001        }
1002
1003        /* prevent overflows */
1004        if (unlikely(
1005            (iocb->aio_buf != (unsigned long)iocb->aio_buf) ||
1006            (iocb->aio_nbytes != (size_t)iocb->aio_nbytes) ||
1007            ((ssize_t)iocb->aio_nbytes < 0)
1008           )) {
1009                pr_debug("EINVAL: io_submit: overflow check\n");
1010                return -EINVAL;
1011        }
1012
1013        file = fget(iocb->aio_fildes);
1014        if (unlikely(!file))
1015                return -EBADF;
1016
1017        req = aio_get_req(ctx);         /* returns with 2 references to req */
1018        if (unlikely(!req)) {
1019                fput(file);
1020                return -EAGAIN;
1021        }
1022
1023        req->ki_filp = file;
1024        iocb->aio_key = req->ki_key;
1025        ret = put_user(iocb->aio_key, &user_iocb->aio_key);
1026        if (unlikely(ret)) {
1027                dprintk("EFAULT: aio_key\n");
1028                goto out_put_req;
1029        }
1030
1031        req->ki_user_obj = user_iocb;
1032        req->ki_user_data = iocb->aio_data;
1033        req->ki_pos = iocb->aio_offset;
1034
1035        buf = (char *)(unsigned long)iocb->aio_buf;
1036
1037        switch (iocb->aio_lio_opcode) {
1038        case IOCB_CMD_PREAD:
1039                ret = -EBADF;
1040                if (unlikely(!(file->f_mode & FMODE_READ)))
1041                        goto out_put_req;
1042                ret = -EFAULT;
1043                if (unlikely(!access_ok(VERIFY_WRITE, buf, iocb->aio_nbytes)))
1044                        goto out_put_req;
1045                ret = -EINVAL;
1046                if (file->f_op->aio_read)
1047                        ret = file->f_op->aio_read(req, buf,
1048                                        iocb->aio_nbytes, req->ki_pos);
1049                break;
1050        case IOCB_CMD_PWRITE:
1051                ret = -EBADF;
1052                if (unlikely(!(file->f_mode & FMODE_WRITE)))
1053                        goto out_put_req;
1054                ret = -EFAULT;
1055                if (unlikely(!access_ok(VERIFY_READ, buf, iocb->aio_nbytes)))
1056                        goto out_put_req;
1057                ret = -EINVAL;
1058                if (file->f_op->aio_write)
1059                        ret = file->f_op->aio_write(req, buf,
1060                                        iocb->aio_nbytes, req->ki_pos);
1061                break;
1062        case IOCB_CMD_FDSYNC:
1063                ret = -EINVAL;
1064                if (file->f_op->aio_fsync)
1065                        ret = file->f_op->aio_fsync(req, 1);
1066                break;
1067        case IOCB_CMD_FSYNC:
1068                ret = -EINVAL;
1069                if (file->f_op->aio_fsync)
1070                        ret = file->f_op->aio_fsync(req, 0);
1071                break;
1072        default:
1073                dprintk("EINVAL: io_submit: no operation provided\n");
1074                ret = -EINVAL;
1075        }
1076
1077        aio_put_req(req);       /* drop extra ref to req */
1078        if (likely(-EIOCBQUEUED == ret))
1079                return 0;
1080        aio_complete(req, ret, 0);      /* will drop i/o ref to req */
1081        return 0;
1082
1083out_put_req:
1084        aio_put_req(req);       /* drop extra ref to req */
1085        aio_put_req(req);       /* drop i/o ref to req */
1086        return ret;
1087}
1088
1089/* sys_io_submit:
1090 *      Queue the nr iocbs pointed to by iocbpp for processing.  Returns
1091 *      the number of iocbs queued.  May return -EINVAL if the aio_context
1092 *      specified by ctx_id is invalid, if nr is < 0, if the iocb at
1093 *      *iocbpp[0] is not properly initialized, if the operation specified
1094 *      is invalid for the file descriptor in the iocb.  May fail with
1095 *      -EFAULT if any of the data structures point to invalid data.  May
1096 *      fail with -EBADF if the file descriptor specified in the first
1097 *      iocb is invalid.  May fail with -EAGAIN if insufficient resources
1098 *      are available to queue any iocbs.  Will return 0 if nr is 0.  Will
1099 *      fail with -ENOSYS if not implemented.
1100 */
1101asmlinkage long sys_io_submit(aio_context_t ctx_id, long nr,
1102                              struct iocb __user **iocbpp)
1103{
1104        struct kioctx *ctx;
1105        long ret = 0;
1106        int i;
1107
1108        if (unlikely(nr < 0))
1109                return -EINVAL;
1110
1111        if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp)))))
1112                return -EFAULT;
1113
1114        ctx = lookup_ioctx(ctx_id);
1115        if (unlikely(!ctx)) {
1116                pr_debug("EINVAL: io_submit: invalid context id\n");
1117                return -EINVAL;
1118        }
1119
1120        /*
1121         * AKPM: should this return a partial result if some of the IOs were
1122         * successfully submitted?
1123         */
1124        for (i=0; i<nr; i++) {
1125                struct iocb __user *user_iocb;
1126                struct iocb tmp;
1127
1128                if (unlikely(__get_user(user_iocb, iocbpp + i))) {
1129                        ret = -EFAULT;
1130                        break;
1131                }
1132
1133                if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp)))) {
1134                        ret = -EFAULT;
1135                        break;
1136                }
1137
1138                ret = io_submit_one(ctx, user_iocb, &tmp);
1139                if (ret)
1140                        break;
1141        }
1142
1143        put_ioctx(ctx);
1144        return i ? i : ret;
1145}
1146
1147/* lookup_kiocb
1148 *      Finds a given iocb for cancellation.
1149 *      MUST be called with ctx->ctx_lock held.
1150 */
1151struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb *iocb, u32 key)
1152{
1153        struct list_head *pos;
1154        /* TODO: use a hash or array, this sucks. */
1155        list_for_each(pos, &ctx->active_reqs) {
1156                struct kiocb *kiocb = list_kiocb(pos);
1157                if (kiocb->ki_user_obj == iocb && kiocb->ki_key == key)
1158                        return kiocb;
1159        }
1160        return NULL;
1161}
1162
1163/* sys_io_cancel:
1164 *      Attempts to cancel an iocb previously passed to io_submit.  If
1165 *      the operation is successfully cancelled, the resulting event is
1166 *      copied into the memory pointed to by result without being placed
1167 *      into the completion queue and 0 is returned.  May fail with
1168 *      -EFAULT if any of the data structures pointed to are invalid.
1169 *      May fail with -EINVAL if aio_context specified by ctx_id is
1170 *      invalid.  May fail with -EAGAIN if the iocb specified was not
1171 *      cancelled.  Will fail with -ENOSYS if not implemented.
1172 */
1173asmlinkage long sys_io_cancel(aio_context_t ctx_id, struct iocb *iocb,
1174                              struct io_event *result)
1175{
1176        int (*cancel)(struct kiocb *iocb, struct io_event *res);
1177        struct kioctx *ctx;
1178        struct kiocb *kiocb;
1179        u32 key;
1180        int ret;
1181
1182        ret = get_user(key, &iocb->aio_key);
1183        if (unlikely(ret))
1184                return -EFAULT;
1185
1186        ctx = lookup_ioctx(ctx_id);
1187        if (unlikely(!ctx))
1188                return -EINVAL;
1189
1190        spin_lock_irq(&ctx->ctx_lock);
1191        ret = -EAGAIN;
1192        kiocb = lookup_kiocb(ctx, iocb, key);
1193        if (kiocb && kiocb->ki_cancel) {
1194                cancel = kiocb->ki_cancel;
1195                kiocb->ki_users ++;
1196        } else
1197                cancel = NULL;
1198        spin_unlock_irq(&ctx->ctx_lock);
1199
1200        if (NULL != cancel) {
1201                struct io_event tmp;
1202                pr_debug("calling cancel\n");
1203                memset(&tmp, 0, sizeof(tmp));
1204                tmp.obj = (u64)(unsigned long)kiocb->ki_user_obj;
1205                tmp.data = kiocb->ki_user_data;
1206                ret = cancel(kiocb, &tmp);
1207                if (!ret) {
1208                        /* Cancellation succeeded -- copy the result
1209                         * into the user's buffer.
1210                         */
1211                        if (copy_to_user(result, &tmp, sizeof(tmp)))
1212                                ret = -EFAULT;
1213                }
1214        } else
1215                printk(KERN_DEBUG "iocb has no cancel operation\n");
1216
1217        put_ioctx(ctx);
1218
1219        return ret;
1220}
1221
1222/* io_getevents:
1223 *      Attempts to read at least min_nr events and up to nr events from
1224 *      the completion queue for the aio_context specified by ctx_id.  May
1225 *      fail with -EINVAL if ctx_id is invalid, if min_nr is out of range,
1226 *      if nr is out of range, if when is out of range.  May fail with
1227 *      -EFAULT if any of the memory specified to is invalid.  May return
1228 *      0 or < min_nr if no events are available and the timeout specified
1229 *      by when has elapsed, where when == NULL specifies an infinite
1230 *      timeout.  Note that the timeout pointed to by when is relative and
1231 *      will be updated if not NULL and the operation blocks.  Will fail
1232 *      with -ENOSYS if not implemented.
1233 */
1234asmlinkage long sys_io_getevents(aio_context_t ctx_id,
1235                                 long min_nr,
1236                                 long nr,
1237                                 struct io_event *events,
1238                                 struct timespec *timeout)
1239{
1240        struct kioctx *ioctx = lookup_ioctx(ctx_id);
1241        long ret = -EINVAL;
1242
1243        if (unlikely(min_nr > nr || min_nr < 0 || nr < 0))
1244                return ret;
1245
1246        if (likely(NULL != ioctx)) {
1247                ret = read_events(ioctx, min_nr, nr, events, timeout);
1248                put_ioctx(ioctx);
1249        }
1250
1251        return ret;
1252}
1253
1254__initcall(aio_setup);
1255
1256EXPORT_SYMBOL(aio_complete);
1257EXPORT_SYMBOL(aio_put_req);
1258EXPORT_SYMBOL(wait_on_sync_kiocb);
1259
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.