linux/fs/nfs/direct.c
<<
>>
Prefs
   1/*
   2 * linux/fs/nfs/direct.c
   3 *
   4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
   5 *
   6 * High-performance uncached I/O for the Linux NFS client
   7 *
   8 * There are important applications whose performance or correctness
   9 * depends on uncached access to file data.  Database clusters
  10 * (multiple copies of the same instance running on separate hosts)
  11 * implement their own cache coherency protocol that subsumes file
  12 * system cache protocols.  Applications that process datasets
  13 * considerably larger than the client's memory do not always benefit
  14 * from a local cache.  A streaming video server, for instance, has no
  15 * need to cache the contents of a file.
  16 *
  17 * When an application requests uncached I/O, all read and write requests
  18 * are made directly to the server; data stored or fetched via these
  19 * requests is not cached in the Linux page cache.  The client does not
  20 * correct unaligned requests from applications.  All requested bytes are
  21 * held on permanent storage before a direct write system call returns to
  22 * an application.
  23 *
  24 * Solaris implements an uncached I/O facility called directio() that
  25 * is used for backups and sequential I/O to very large files.  Solaris
  26 * also supports uncaching whole NFS partitions with "-o forcedirectio,"
  27 * an undocumented mount option.
  28 *
  29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
  30 * help from Andrew Morton.
  31 *
  32 * 18 Dec 2001  Initial implementation for 2.4  --cel
  33 * 08 Jul 2002  Version for 2.4.19, with bug fixes --trondmy
  34 * 08 Jun 2003  Port to 2.5 APIs  --cel
  35 * 31 Mar 2004  Handle direct I/O without VFS support  --cel
  36 * 15 Sep 2004  Parallel async reads  --cel
  37 * 04 May 2005  support O_DIRECT with aio  --cel
  38 *
  39 */
  40
  41#include <linux/errno.h>
  42#include <linux/sched.h>
  43#include <linux/kernel.h>
  44#include <linux/file.h>
  45#include <linux/pagemap.h>
  46#include <linux/kref.h>
  47#include <linux/slab.h>
  48#include <linux/task_io_accounting_ops.h>
  49#include <linux/module.h>
  50
  51#include <linux/nfs_fs.h>
  52#include <linux/nfs_page.h>
  53#include <linux/sunrpc/clnt.h>
  54
  55#include <asm/uaccess.h>
  56#include <linux/atomic.h>
  57
  58#include "internal.h"
  59#include "iostat.h"
  60#include "pnfs.h"
  61
  62#define NFSDBG_FACILITY         NFSDBG_VFS
  63
  64static struct kmem_cache *nfs_direct_cachep;
  65
  66/*
  67 * This represents a set of asynchronous requests that we're waiting on
  68 */
  69struct nfs_direct_req {
  70        struct kref             kref;           /* release manager */
  71
  72        /* I/O parameters */
  73        struct nfs_open_context *ctx;           /* file open context info */
  74        struct nfs_lock_context *l_ctx;         /* Lock context info */
  75        struct kiocb *          iocb;           /* controlling i/o request */
  76        struct inode *          inode;          /* target file of i/o */
  77
  78        /* completion state */
  79        atomic_t                io_count;       /* i/os we're waiting for */
  80        spinlock_t              lock;           /* protect completion state */
  81        ssize_t                 count,          /* bytes actually processed */
  82                                bytes_left,     /* bytes left to be sent */
  83                                error;          /* any reported error */
  84        struct completion       completion;     /* wait for i/o completion */
  85
  86        /* commit state */
  87        struct nfs_mds_commit_info mds_cinfo;   /* Storage for cinfo */
  88        struct pnfs_ds_commit_info ds_cinfo;    /* Storage for cinfo */
  89        struct work_struct      work;
  90        int                     flags;
  91#define NFS_ODIRECT_DO_COMMIT           (1)     /* an unstable reply was received */
  92#define NFS_ODIRECT_RESCHED_WRITES      (2)     /* write verification failed */
  93        struct nfs_writeverf    verf;           /* unstable write verifier */
  94};
  95
  96static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
  97static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
  98static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
  99static void nfs_direct_write_schedule_work(struct work_struct *work);
 100
 101static inline void get_dreq(struct nfs_direct_req *dreq)
 102{
 103        atomic_inc(&dreq->io_count);
 104}
 105
 106static inline int put_dreq(struct nfs_direct_req *dreq)
 107{
 108        return atomic_dec_and_test(&dreq->io_count);
 109}
 110
 111/**
 112 * nfs_direct_IO - NFS address space operation for direct I/O
 113 * @rw: direction (read or write)
 114 * @iocb: target I/O control block
 115 * @iov: array of vectors that define I/O buffer
 116 * @pos: offset in file to begin the operation
 117 * @nr_segs: size of iovec array
 118 *
 119 * The presence of this routine in the address space ops vector means
 120 * the NFS client supports direct I/O. However, for most direct IO, we
 121 * shunt off direct read and write requests before the VFS gets them,
 122 * so this method is only ever called for swap.
 123 */
 124ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
 125{
 126#ifndef CONFIG_NFS_SWAP
 127        dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
 128                        iocb->ki_filp->f_path.dentry->d_name.name,
 129                        (long long) pos, nr_segs);
 130
 131        return -EINVAL;
 132#else
 133        VM_BUG_ON(iocb->ki_left != PAGE_SIZE);
 134        VM_BUG_ON(iocb->ki_nbytes != PAGE_SIZE);
 135
 136        if (rw == READ || rw == KERNEL_READ)
 137                return nfs_file_direct_read(iocb, iov, nr_segs, pos,
 138                                rw == READ ? true : false);
 139        return nfs_file_direct_write(iocb, iov, nr_segs, pos,
 140                                rw == WRITE ? true : false);
 141#endif /* CONFIG_NFS_SWAP */
 142}
 143
 144static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
 145{
 146        unsigned int i;
 147        for (i = 0; i < npages; i++)
 148                page_cache_release(pages[i]);
 149}
 150
 151void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
 152                              struct nfs_direct_req *dreq)
 153{
 154        cinfo->lock = &dreq->lock;
 155        cinfo->mds = &dreq->mds_cinfo;
 156        cinfo->ds = &dreq->ds_cinfo;
 157        cinfo->dreq = dreq;
 158        cinfo->completion_ops = &nfs_direct_commit_completion_ops;
 159}
 160
 161static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
 162{
 163        struct nfs_direct_req *dreq;
 164
 165        dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
 166        if (!dreq)
 167                return NULL;
 168
 169        kref_init(&dreq->kref);
 170        kref_get(&dreq->kref);
 171        init_completion(&dreq->completion);
 172        INIT_LIST_HEAD(&dreq->mds_cinfo.list);
 173        INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
 174        spin_lock_init(&dreq->lock);
 175
 176        return dreq;
 177}
 178
 179static void nfs_direct_req_free(struct kref *kref)
 180{
 181        struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
 182
 183        if (dreq->l_ctx != NULL)
 184                nfs_put_lock_context(dreq->l_ctx);
 185        if (dreq->ctx != NULL)
 186                put_nfs_open_context(dreq->ctx);
 187        kmem_cache_free(nfs_direct_cachep, dreq);
 188}
 189
 190static void nfs_direct_req_release(struct nfs_direct_req *dreq)
 191{
 192        kref_put(&dreq->kref, nfs_direct_req_free);
 193}
 194
 195ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq)
 196{
 197        return dreq->bytes_left;
 198}
 199EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
 200
 201/*
 202 * Collects and returns the final error value/byte-count.
 203 */
 204static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
 205{
 206        ssize_t result = -EIOCBQUEUED;
 207
 208        /* Async requests don't wait here */
 209        if (dreq->iocb)
 210                goto out;
 211
 212        result = wait_for_completion_killable(&dreq->completion);
 213
 214        if (!result)
 215                result = dreq->error;
 216        if (!result)
 217                result = dreq->count;
 218
 219out:
 220        return (ssize_t) result;
 221}
 222
 223/*
 224 * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
 225 * the iocb is still valid here if this is a synchronous request.
 226 */
 227static void nfs_direct_complete(struct nfs_direct_req *dreq)
 228{
 229        if (dreq->iocb) {
 230                long res = (long) dreq->error;
 231                if (!res)
 232                        res = (long) dreq->count;
 233                aio_complete(dreq->iocb, res, 0);
 234        }
 235        complete_all(&dreq->completion);
 236
 237        nfs_direct_req_release(dreq);
 238}
 239
 240static void nfs_direct_readpage_release(struct nfs_page *req)
 241{
 242        dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
 243                req->wb_context->dentry->d_inode->i_sb->s_id,
 244                (long long)NFS_FILEID(req->wb_context->dentry->d_inode),
 245                req->wb_bytes,
 246                (long long)req_offset(req));
 247        nfs_release_request(req);
 248}
 249
 250static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
 251{
 252        unsigned long bytes = 0;
 253        struct nfs_direct_req *dreq = hdr->dreq;
 254
 255        if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
 256                goto out_put;
 257
 258        spin_lock(&dreq->lock);
 259        if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
 260                dreq->error = hdr->error;
 261        else
 262                dreq->count += hdr->good_bytes;
 263        spin_unlock(&dreq->lock);
 264
 265        while (!list_empty(&hdr->pages)) {
 266                struct nfs_page *req = nfs_list_entry(hdr->pages.next);
 267                struct page *page = req->wb_page;
 268
 269                if (!PageCompound(page)) {
 270                        if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
 271                                if (bytes < hdr->good_bytes)
 272                                        set_page_dirty(page);
 273                        } else
 274                                set_page_dirty(page);
 275                }
 276                bytes += req->wb_bytes;
 277                nfs_list_remove_request(req);
 278                nfs_direct_readpage_release(req);
 279        }
 280out_put:
 281        if (put_dreq(dreq))
 282                nfs_direct_complete(dreq);
 283        hdr->release(hdr);
 284}
 285
 286static void nfs_read_sync_pgio_error(struct list_head *head)
 287{
 288        struct nfs_page *req;
 289
 290        while (!list_empty(head)) {
 291                req = nfs_list_entry(head->next);
 292                nfs_list_remove_request(req);
 293                nfs_release_request(req);
 294        }
 295}
 296
 297static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
 298{
 299        get_dreq(hdr->dreq);
 300}
 301
 302static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
 303        .error_cleanup = nfs_read_sync_pgio_error,
 304        .init_hdr = nfs_direct_pgio_init,
 305        .completion = nfs_direct_read_completion,
 306};
 307
 308/*
 309 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
 310 * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
 311 * bail and stop sending more reads.  Read length accounting is
 312 * handled automatically by nfs_direct_read_result().  Otherwise, if
 313 * no requests have been sent, just return an error.
 314 */
 315static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
 316                                                const struct iovec *iov,
 317                                                loff_t pos, bool uio)
 318{
 319        struct nfs_direct_req *dreq = desc->pg_dreq;
 320        struct nfs_open_context *ctx = dreq->ctx;
 321        struct inode *inode = ctx->dentry->d_inode;
 322        unsigned long user_addr = (unsigned long)iov->iov_base;
 323        size_t count = iov->iov_len;
 324        size_t rsize = NFS_SERVER(inode)->rsize;
 325        unsigned int pgbase;
 326        int result;
 327        ssize_t started = 0;
 328        struct page **pagevec = NULL;
 329        unsigned int npages;
 330
 331        do {
 332                size_t bytes;
 333                int i;
 334
 335                pgbase = user_addr & ~PAGE_MASK;
 336                bytes = min(max_t(size_t, rsize, PAGE_SIZE), count);
 337
 338                result = -ENOMEM;
 339                npages = nfs_page_array_len(pgbase, bytes);
 340                if (!pagevec)
 341                        pagevec = kmalloc(npages * sizeof(struct page *),
 342                                          GFP_KERNEL);
 343                if (!pagevec)
 344                        break;
 345                if (uio) {
 346                        down_read(&current->mm->mmap_sem);
 347                        result = get_user_pages(current, current->mm, user_addr,
 348                                        npages, 1, 0, pagevec, NULL);
 349                        up_read(&current->mm->mmap_sem);
 350                        if (result < 0)
 351                                break;
 352                } else {
 353                        WARN_ON(npages != 1);
 354                        result = get_kernel_page(user_addr, 1, pagevec);
 355                        if (WARN_ON(result != 1))
 356                                break;
 357                }
 358
 359                if ((unsigned)result < npages) {
 360                        bytes = result * PAGE_SIZE;
 361                        if (bytes <= pgbase) {
 362                                nfs_direct_release_pages(pagevec, result);
 363                                break;
 364                        }
 365                        bytes -= pgbase;
 366                        npages = result;
 367                }
 368
 369                for (i = 0; i < npages; i++) {
 370                        struct nfs_page *req;
 371                        unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
 372                        /* XXX do we need to do the eof zeroing found in async_filler? */
 373                        req = nfs_create_request(dreq->ctx, dreq->inode,
 374                                                 pagevec[i],
 375                                                 pgbase, req_len);
 376                        if (IS_ERR(req)) {
 377                                result = PTR_ERR(req);
 378                                break;
 379                        }
 380                        req->wb_index = pos >> PAGE_SHIFT;
 381                        req->wb_offset = pos & ~PAGE_MASK;
 382                        if (!nfs_pageio_add_request(desc, req)) {
 383                                result = desc->pg_error;
 384                                nfs_release_request(req);
 385                                break;
 386                        }
 387                        pgbase = 0;
 388                        bytes -= req_len;
 389                        started += req_len;
 390                        user_addr += req_len;
 391                        pos += req_len;
 392                        count -= req_len;
 393                        dreq->bytes_left -= req_len;
 394                }
 395                /* The nfs_page now hold references to these pages */
 396                nfs_direct_release_pages(pagevec, npages);
 397        } while (count != 0 && result >= 0);
 398
 399        kfree(pagevec);
 400
 401        if (started)
 402                return started;
 403        return result < 0 ? (ssize_t) result : -EFAULT;
 404}
 405
 406static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
 407                                              const struct iovec *iov,
 408                                              unsigned long nr_segs,
 409                                              loff_t pos, bool uio)
 410{
 411        struct nfs_pageio_descriptor desc;
 412        ssize_t result = -EINVAL;
 413        size_t requested_bytes = 0;
 414        unsigned long seg;
 415
 416        NFS_PROTO(dreq->inode)->read_pageio_init(&desc, dreq->inode,
 417                             &nfs_direct_read_completion_ops);
 418        get_dreq(dreq);
 419        desc.pg_dreq = dreq;
 420
 421        for (seg = 0; seg < nr_segs; seg++) {
 422                const struct iovec *vec = &iov[seg];
 423                result = nfs_direct_read_schedule_segment(&desc, vec, pos, uio);
 424                if (result < 0)
 425                        break;
 426                requested_bytes += result;
 427                if ((size_t)result < vec->iov_len)
 428                        break;
 429                pos += vec->iov_len;
 430        }
 431
 432        nfs_pageio_complete(&desc);
 433
 434        /*
 435         * If no bytes were started, return the error, and let the
 436         * generic layer handle the completion.
 437         */
 438        if (requested_bytes == 0) {
 439                nfs_direct_req_release(dreq);
 440                return result < 0 ? result : -EIO;
 441        }
 442
 443        if (put_dreq(dreq))
 444                nfs_direct_complete(dreq);
 445        return 0;
 446}
 447
 448static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
 449                               unsigned long nr_segs, loff_t pos, bool uio)
 450{
 451        ssize_t result = -ENOMEM;
 452        struct inode *inode = iocb->ki_filp->f_mapping->host;
 453        struct nfs_direct_req *dreq;
 454        struct nfs_lock_context *l_ctx;
 455
 456        dreq = nfs_direct_req_alloc();
 457        if (dreq == NULL)
 458                goto out;
 459
 460        dreq->inode = inode;
 461        dreq->bytes_left = iov_length(iov, nr_segs);
 462        dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
 463        l_ctx = nfs_get_lock_context(dreq->ctx);
 464        if (IS_ERR(l_ctx)) {
 465                result = PTR_ERR(l_ctx);
 466                goto out_release;
 467        }
 468        dreq->l_ctx = l_ctx;
 469        if (!is_sync_kiocb(iocb))
 470                dreq->iocb = iocb;
 471
 472        NFS_I(inode)->read_io += iov_length(iov, nr_segs);
 473        result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos, uio);
 474        if (!result)
 475                result = nfs_direct_wait(dreq);
 476out_release:
 477        nfs_direct_req_release(dreq);
 478out:
 479        return result;
 480}
 481
 482static void nfs_inode_dio_write_done(struct inode *inode)
 483{
 484        nfs_zap_mapping(inode, inode->i_mapping);
 485        inode_dio_done(inode);
 486}
 487
 488#if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
 489static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
 490{
 491        struct nfs_pageio_descriptor desc;
 492        struct nfs_page *req, *tmp;
 493        LIST_HEAD(reqs);
 494        struct nfs_commit_info cinfo;
 495        LIST_HEAD(failed);
 496
 497        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 498        pnfs_recover_commit_reqs(dreq->inode, &reqs, &cinfo);
 499        spin_lock(cinfo.lock);
 500        nfs_scan_commit_list(&cinfo.mds->list, &reqs, &cinfo, 0);
 501        spin_unlock(cinfo.lock);
 502
 503        dreq->count = 0;
 504        get_dreq(dreq);
 505
 506        NFS_PROTO(dreq->inode)->write_pageio_init(&desc, dreq->inode, FLUSH_STABLE,
 507                              &nfs_direct_write_completion_ops);
 508        desc.pg_dreq = dreq;
 509
 510        list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
 511                if (!nfs_pageio_add_request(&desc, req)) {
 512                        nfs_list_remove_request(req);
 513                        nfs_list_add_request(req, &failed);
 514                        spin_lock(cinfo.lock);
 515                        dreq->flags = 0;
 516                        dreq->error = -EIO;
 517                        spin_unlock(cinfo.lock);
 518                }
 519                nfs_release_request(req);
 520        }
 521        nfs_pageio_complete(&desc);
 522
 523        while (!list_empty(&failed)) {
 524                req = nfs_list_entry(failed.next);
 525                nfs_list_remove_request(req);
 526                nfs_unlock_and_release_request(req);
 527        }
 528
 529        if (put_dreq(dreq))
 530                nfs_direct_write_complete(dreq, dreq->inode);
 531}
 532
 533static void nfs_direct_commit_complete(struct nfs_commit_data *data)
 534{
 535        struct nfs_direct_req *dreq = data->dreq;
 536        struct nfs_commit_info cinfo;
 537        struct nfs_page *req;
 538        int status = data->task.tk_status;
 539
 540        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 541        if (status < 0) {
 542                dprintk("NFS: %5u commit failed with error %d.\n",
 543                        data->task.tk_pid, status);
 544                dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 545        } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
 546                dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid);
 547                dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 548        }
 549
 550        dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status);
 551        while (!list_empty(&data->pages)) {
 552                req = nfs_list_entry(data->pages.next);
 553                nfs_list_remove_request(req);
 554                if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
 555                        /* Note the rewrite will go through mds */
 556                        nfs_mark_request_commit(req, NULL, &cinfo);
 557                } else
 558                        nfs_release_request(req);
 559                nfs_unlock_and_release_request(req);
 560        }
 561
 562        if (atomic_dec_and_test(&cinfo.mds->rpcs_out))
 563                nfs_direct_write_complete(dreq, data->inode);
 564}
 565
 566static void nfs_direct_error_cleanup(struct nfs_inode *nfsi)
 567{
 568        /* There is no lock to clear */
 569}
 570
 571static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
 572        .completion = nfs_direct_commit_complete,
 573        .error_cleanup = nfs_direct_error_cleanup,
 574};
 575
 576static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
 577{
 578        int res;
 579        struct nfs_commit_info cinfo;
 580        LIST_HEAD(mds_list);
 581
 582        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 583        nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
 584        res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
 585        if (res < 0) /* res == -ENOMEM */
 586                nfs_direct_write_reschedule(dreq);
 587}
 588
 589static void nfs_direct_write_schedule_work(struct work_struct *work)
 590{
 591        struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
 592        int flags = dreq->flags;
 593
 594        dreq->flags = 0;
 595        switch (flags) {
 596                case NFS_ODIRECT_DO_COMMIT:
 597                        nfs_direct_commit_schedule(dreq);
 598                        break;
 599                case NFS_ODIRECT_RESCHED_WRITES:
 600                        nfs_direct_write_reschedule(dreq);
 601                        break;
 602                default:
 603                        nfs_inode_dio_write_done(dreq->inode);
 604                        nfs_direct_complete(dreq);
 605        }
 606}
 607
 608static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
 609{
 610        schedule_work(&dreq->work); /* Calls nfs_direct_write_schedule_work */
 611}
 612
 613#else
 614static void nfs_direct_write_schedule_work(struct work_struct *work)
 615{
 616}
 617
 618static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
 619{
 620        nfs_inode_dio_write_done(inode);
 621        nfs_direct_complete(dreq);
 622}
 623#endif
 624
 625/*
 626 * NB: Return the value of the first error return code.  Subsequent
 627 *     errors after the first one are ignored.
 628 */
 629/*
 630 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
 631 * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
 632 * bail and stop sending more writes.  Write length accounting is
 633 * handled automatically by nfs_direct_write_result().  Otherwise, if
 634 * no requests have been sent, just return an error.
 635 */
 636static ssize_t nfs_direct_write_schedule_segment(struct nfs_pageio_descriptor *desc,
 637                                                 const struct iovec *iov,
 638                                                 loff_t pos, bool uio)
 639{
 640        struct nfs_direct_req *dreq = desc->pg_dreq;
 641        struct nfs_open_context *ctx = dreq->ctx;
 642        struct inode *inode = ctx->dentry->d_inode;
 643        unsigned long user_addr = (unsigned long)iov->iov_base;
 644        size_t count = iov->iov_len;
 645        size_t wsize = NFS_SERVER(inode)->wsize;
 646        unsigned int pgbase;
 647        int result;
 648        ssize_t started = 0;
 649        struct page **pagevec = NULL;
 650        unsigned int npages;
 651
 652        do {
 653                size_t bytes;
 654                int i;
 655
 656                pgbase = user_addr & ~PAGE_MASK;
 657                bytes = min(max_t(size_t, wsize, PAGE_SIZE), count);
 658
 659                result = -ENOMEM;
 660                npages = nfs_page_array_len(pgbase, bytes);
 661                if (!pagevec)
 662                        pagevec = kmalloc(npages * sizeof(struct page *), GFP_KERNEL);
 663                if (!pagevec)
 664                        break;
 665
 666                if (uio) {
 667                        down_read(&current->mm->mmap_sem);
 668                        result = get_user_pages(current, current->mm, user_addr,
 669                                                npages, 0, 0, pagevec, NULL);
 670                        up_read(&current->mm->mmap_sem);
 671                        if (result < 0)
 672                                break;
 673                } else {
 674                        WARN_ON(npages != 1);
 675                        result = get_kernel_page(user_addr, 0, pagevec);
 676                        if (WARN_ON(result != 1))
 677                                break;
 678                }
 679
 680                if ((unsigned)result < npages) {
 681                        bytes = result * PAGE_SIZE;
 682                        if (bytes <= pgbase) {
 683                                nfs_direct_release_pages(pagevec, result);
 684                                break;
 685                        }
 686                        bytes -= pgbase;
 687                        npages = result;
 688                }
 689
 690                for (i = 0; i < npages; i++) {
 691                        struct nfs_page *req;
 692                        unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
 693
 694                        req = nfs_create_request(dreq->ctx, dreq->inode,
 695                                                 pagevec[i],
 696                                                 pgbase, req_len);
 697                        if (IS_ERR(req)) {
 698                                result = PTR_ERR(req);
 699                                break;
 700                        }
 701                        nfs_lock_request(req);
 702                        req->wb_index = pos >> PAGE_SHIFT;
 703                        req->wb_offset = pos & ~PAGE_MASK;
 704                        if (!nfs_pageio_add_request(desc, req)) {
 705                                result = desc->pg_error;
 706                                nfs_unlock_and_release_request(req);
 707                                break;
 708                        }
 709                        pgbase = 0;
 710                        bytes -= req_len;
 711                        started += req_len;
 712                        user_addr += req_len;
 713                        pos += req_len;
 714                        count -= req_len;
 715                        dreq->bytes_left -= req_len;
 716                }
 717                /* The nfs_page now hold references to these pages */
 718                nfs_direct_release_pages(pagevec, npages);
 719        } while (count != 0 && result >= 0);
 720
 721        kfree(pagevec);
 722
 723        if (started)
 724                return started;
 725        return result < 0 ? (ssize_t) result : -EFAULT;
 726}
 727
 728static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
 729{
 730        struct nfs_direct_req *dreq = hdr->dreq;
 731        struct nfs_commit_info cinfo;
 732        int bit = -1;
 733        struct nfs_page *req = nfs_list_entry(hdr->pages.next);
 734
 735        if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
 736                goto out_put;
 737
 738        nfs_init_cinfo_from_dreq(&cinfo, dreq);
 739
 740        spin_lock(&dreq->lock);
 741
 742        if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
 743                dreq->flags = 0;
 744                dreq->error = hdr->error;
 745        }
 746        if (dreq->error != 0)
 747                bit = NFS_IOHDR_ERROR;
 748        else {
 749                dreq->count += hdr->good_bytes;
 750                if (test_bit(NFS_IOHDR_NEED_RESCHED, &hdr->flags)) {
 751                        dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 752                        bit = NFS_IOHDR_NEED_RESCHED;
 753                } else if (test_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags)) {
 754                        if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
 755                                bit = NFS_IOHDR_NEED_RESCHED;
 756                        else if (dreq->flags == 0) {
 757                                memcpy(&dreq->verf, hdr->verf,
 758                                       sizeof(dreq->verf));
 759                                bit = NFS_IOHDR_NEED_COMMIT;
 760                                dreq->flags = NFS_ODIRECT_DO_COMMIT;
 761                        } else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
 762                                if (memcmp(&dreq->verf, hdr->verf, sizeof(dreq->verf))) {
 763                                        dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 764                                        bit = NFS_IOHDR_NEED_RESCHED;
 765                                } else
 766                                        bit = NFS_IOHDR_NEED_COMMIT;
 767                        }
 768                }
 769        }
 770        spin_unlock(&dreq->lock);
 771
 772        while (!list_empty(&hdr->pages)) {
 773                req = nfs_list_entry(hdr->pages.next);
 774                nfs_list_remove_request(req);
 775                switch (bit) {
 776                case NFS_IOHDR_NEED_RESCHED:
 777                case NFS_IOHDR_NEED_COMMIT:
 778                        kref_get(&req->wb_kref);
 779                        nfs_mark_request_commit(req, hdr->lseg, &cinfo);
 780                }
 781                nfs_unlock_and_release_request(req);
 782        }
 783
 784out_put:
 785        if (put_dreq(dreq))
 786                nfs_direct_write_complete(dreq, hdr->inode);
 787        hdr->release(hdr);
 788}
 789
 790static void nfs_write_sync_pgio_error(struct list_head *head)
 791{
 792        struct nfs_page *req;
 793
 794        while (!list_empty(head)) {
 795                req = nfs_list_entry(head->next);
 796                nfs_list_remove_request(req);
 797                nfs_unlock_and_release_request(req);
 798        }
 799}
 800
 801static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
 802        .error_cleanup = nfs_write_sync_pgio_error,
 803        .init_hdr = nfs_direct_pgio_init,
 804        .completion = nfs_direct_write_completion,
 805};
 806
 807static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
 808                                               const struct iovec *iov,
 809                                               unsigned long nr_segs,
 810                                               loff_t pos, bool uio)
 811{
 812        struct nfs_pageio_descriptor desc;
 813        struct inode *inode = dreq->inode;
 814        ssize_t result = 0;
 815        size_t requested_bytes = 0;
 816        unsigned long seg;
 817
 818        NFS_PROTO(inode)->write_pageio_init(&desc, inode, FLUSH_COND_STABLE,
 819                              &nfs_direct_write_completion_ops);
 820        desc.pg_dreq = dreq;
 821        get_dreq(dreq);
 822        atomic_inc(&inode->i_dio_count);
 823
 824        NFS_I(dreq->inode)->write_io += iov_length(iov, nr_segs);
 825        for (seg = 0; seg < nr_segs; seg++) {
 826                const struct iovec *vec = &iov[seg];
 827                result = nfs_direct_write_schedule_segment(&desc, vec, pos, uio);
 828                if (result < 0)
 829                        break;
 830                requested_bytes += result;
 831                if ((size_t)result < vec->iov_len)
 832                        break;
 833                pos += vec->iov_len;
 834        }
 835        nfs_pageio_complete(&desc);
 836
 837        /*
 838         * If no bytes were started, return the error, and let the
 839         * generic layer handle the completion.
 840         */
 841        if (requested_bytes == 0) {
 842                inode_dio_done(inode);
 843                nfs_direct_req_release(dreq);
 844                return result < 0 ? result : -EIO;
 845        }
 846
 847        if (put_dreq(dreq))
 848                nfs_direct_write_complete(dreq, dreq->inode);
 849        return 0;
 850}
 851
 852static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
 853                                unsigned long nr_segs, loff_t pos,
 854                                size_t count, bool uio)
 855{
 856        ssize_t result = -ENOMEM;
 857        struct inode *inode = iocb->ki_filp->f_mapping->host;
 858        struct nfs_direct_req *dreq;
 859        struct nfs_lock_context *l_ctx;
 860
 861        dreq = nfs_direct_req_alloc();
 862        if (!dreq)
 863                goto out;
 864
 865        dreq->inode = inode;
 866        dreq->bytes_left = count;
 867        dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
 868        l_ctx = nfs_get_lock_context(dreq->ctx);
 869        if (IS_ERR(l_ctx)) {
 870                result = PTR_ERR(l_ctx);
 871                goto out_release;
 872        }
 873        dreq->l_ctx = l_ctx;
 874        if (!is_sync_kiocb(iocb))
 875                dreq->iocb = iocb;
 876
 877        result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, uio);
 878        if (!result)
 879                result = nfs_direct_wait(dreq);
 880out_release:
 881        nfs_direct_req_release(dreq);
 882out:
 883        return result;
 884}
 885
 886/**
 887 * nfs_file_direct_read - file direct read operation for NFS files
 888 * @iocb: target I/O control block
 889 * @iov: vector of user buffers into which to read data
 890 * @nr_segs: size of iov vector
 891 * @pos: byte offset in file where reading starts
 892 *
 893 * We use this function for direct reads instead of calling
 894 * generic_file_aio_read() in order to avoid gfar's check to see if
 895 * the request starts before the end of the file.  For that check
 896 * to work, we must generate a GETATTR before each direct read, and
 897 * even then there is a window between the GETATTR and the subsequent
 898 * READ where the file size could change.  Our preference is simply
 899 * to do all reads the application wants, and the server will take
 900 * care of managing the end of file boundary.
 901 *
 902 * This function also eliminates unnecessarily updating the file's
 903 * atime locally, as the NFS server sets the file's atime, and this
 904 * client must read the updated atime from the server back into its
 905 * cache.
 906 */
 907ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
 908                                unsigned long nr_segs, loff_t pos, bool uio)
 909{
 910        ssize_t retval = -EINVAL;
 911        struct file *file = iocb->ki_filp;
 912        struct address_space *mapping = file->f_mapping;
 913        size_t count;
 914
 915        count = iov_length(iov, nr_segs);
 916        nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
 917
 918        dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n",
 919                file->f_path.dentry->d_parent->d_name.name,
 920                file->f_path.dentry->d_name.name,
 921                count, (long long) pos);
 922
 923        retval = 0;
 924        if (!count)
 925                goto out;
 926
 927        retval = nfs_sync_mapping(mapping);
 928        if (retval)
 929                goto out;
 930
 931        task_io_account_read(count);
 932
 933        retval = nfs_direct_read(iocb, iov, nr_segs, pos, uio);
 934        if (retval > 0)
 935                iocb->ki_pos = pos + retval;
 936
 937out:
 938        return retval;
 939}
 940
 941/**
 942 * nfs_file_direct_write - file direct write operation for NFS files
 943 * @iocb: target I/O control block
 944 * @iov: vector of user buffers from which to write data
 945 * @nr_segs: size of iov vector
 946 * @pos: byte offset in file where writing starts
 947 *
 948 * We use this function for direct writes instead of calling
 949 * generic_file_aio_write() in order to avoid taking the inode
 950 * semaphore and updating the i_size.  The NFS server will set
 951 * the new i_size and this client must read the updated size
 952 * back into its cache.  We let the server do generic write
 953 * parameter checking and report problems.
 954 *
 955 * We eliminate local atime updates, see direct read above.
 956 *
 957 * We avoid unnecessary page cache invalidations for normal cached
 958 * readers of this file.
 959 *
 960 * Note that O_APPEND is not supported for NFS direct writes, as there
 961 * is no atomic O_APPEND write facility in the NFS protocol.
 962 */
 963ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
 964                                unsigned long nr_segs, loff_t pos, bool uio)
 965{
 966        ssize_t retval = -EINVAL;
 967        struct file *file = iocb->ki_filp;
 968        struct address_space *mapping = file->f_mapping;
 969        size_t count;
 970
 971        count = iov_length(iov, nr_segs);
 972        nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
 973
 974        dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n",
 975                file->f_path.dentry->d_parent->d_name.name,
 976                file->f_path.dentry->d_name.name,
 977                count, (long long) pos);
 978
 979        retval = generic_write_checks(file, &pos, &count, 0);
 980        if (retval)
 981                goto out;
 982
 983        retval = -EINVAL;
 984        if ((ssize_t) count < 0)
 985                goto out;
 986        retval = 0;
 987        if (!count)
 988                goto out;
 989
 990        retval = nfs_sync_mapping(mapping);
 991        if (retval)
 992                goto out;
 993
 994        task_io_account_write(count);
 995
 996        retval = nfs_direct_write(iocb, iov, nr_segs, pos, count, uio);
 997        if (retval > 0) {
 998                struct inode *inode = mapping->host;
 999
1000                iocb->ki_pos = pos + retval;
1001                spin_lock(&inode->i_lock);
1002                if (i_size_read(inode) < iocb->ki_pos)
1003                        i_size_write(inode, iocb->ki_pos);
1004                spin_unlock(&inode->i_lock);
1005        }
1006out:
1007        return retval;
1008}
1009
1010/**
1011 * nfs_init_directcache - create a slab cache for nfs_direct_req structures
1012 *
1013 */
1014int __init nfs_init_directcache(void)
1015{
1016        nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
1017                                                sizeof(struct nfs_direct_req),
1018                                                0, (SLAB_RECLAIM_ACCOUNT|
1019                                                        SLAB_MEM_SPREAD),
1020                                                NULL);
1021        if (nfs_direct_cachep == NULL)
1022                return -ENOMEM;
1023
1024        return 0;
1025}
1026
1027/**
1028 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1029 *
1030 */
1031void nfs_destroy_directcache(void)
1032{
1033        kmem_cache_destroy(nfs_direct_cachep);
1034}
1035
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.