linux/fs/ceph/addr.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/backing-dev.h>
   4#include <linux/fs.h>
   5#include <linux/mm.h>
   6#include <linux/pagemap.h>
   7#include <linux/writeback.h>    /* generic_writepages */
   8#include <linux/slab.h>
   9#include <linux/pagevec.h>
  10#include <linux/task_io_accounting_ops.h>
  11
  12#include "super.h"
  13#include "mds_client.h"
  14#include "cache.h"
  15#include <linux/ceph/osd_client.h>
  16
  17/*
  18 * Ceph address space ops.
  19 *
  20 * There are a few funny things going on here.
  21 *
  22 * The page->private field is used to reference a struct
  23 * ceph_snap_context for _every_ dirty page.  This indicates which
  24 * snapshot the page was logically dirtied in, and thus which snap
  25 * context needs to be associated with the osd write during writeback.
  26 *
  27 * Similarly, struct ceph_inode_info maintains a set of counters to
  28 * count dirty pages on the inode.  In the absence of snapshots,
  29 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
  30 *
  31 * When a snapshot is taken (that is, when the client receives
  32 * notification that a snapshot was taken), each inode with caps and
  33 * with dirty pages (dirty pages implies there is a cap) gets a new
  34 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
  35 * order, new snaps go to the tail).  The i_wrbuffer_ref_head count is
  36 * moved to capsnap->dirty. (Unless a sync write is currently in
  37 * progress.  In that case, the capsnap is said to be "pending", new
  38 * writes cannot start, and the capsnap isn't "finalized" until the
  39 * write completes (or fails) and a final size/mtime for the inode for
  40 * that snap can be settled upon.)  i_wrbuffer_ref_head is reset to 0.
  41 *
  42 * On writeback, we must submit writes to the osd IN SNAP ORDER.  So,
  43 * we look for the first capsnap in i_cap_snaps and write out pages in
  44 * that snap context _only_.  Then we move on to the next capsnap,
  45 * eventually reaching the "live" or "head" context (i.e., pages that
  46 * are not yet snapped) and are writing the most recently dirtied
  47 * pages.
  48 *
  49 * Invalidate and so forth must take care to ensure the dirty page
  50 * accounting is preserved.
  51 */
  52
  53#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
  54#define CONGESTION_OFF_THRESH(congestion_kb)                            \
  55        (CONGESTION_ON_THRESH(congestion_kb) -                          \
  56         (CONGESTION_ON_THRESH(congestion_kb) >> 2))
  57
  58static inline struct ceph_snap_context *page_snap_context(struct page *page)
  59{
  60        if (PagePrivate(page))
  61                return (void *)page->private;
  62        return NULL;
  63}
  64
  65/*
  66 * Dirty a page.  Optimistically adjust accounting, on the assumption
  67 * that we won't race with invalidate.  If we do, readjust.
  68 */
  69static int ceph_set_page_dirty(struct page *page)
  70{
  71        struct address_space *mapping = page->mapping;
  72        struct inode *inode;
  73        struct ceph_inode_info *ci;
  74        struct ceph_snap_context *snapc;
  75        int ret;
  76
  77        if (unlikely(!mapping))
  78                return !TestSetPageDirty(page);
  79
  80        if (PageDirty(page)) {
  81                dout("%p set_page_dirty %p idx %lu -- already dirty\n",
  82                     mapping->host, page, page->index);
  83                BUG_ON(!PagePrivate(page));
  84                return 0;
  85        }
  86
  87        inode = mapping->host;
  88        ci = ceph_inode(inode);
  89
  90        /*
  91         * Note that we're grabbing a snapc ref here without holding
  92         * any locks!
  93         */
  94        snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
  95
  96        /* dirty the head */
  97        spin_lock(&ci->i_ceph_lock);
  98        if (ci->i_head_snapc == NULL)
  99                ci->i_head_snapc = ceph_get_snap_context(snapc);
 100        ++ci->i_wrbuffer_ref_head;
 101        if (ci->i_wrbuffer_ref == 0)
 102                ihold(inode);
 103        ++ci->i_wrbuffer_ref;
 104        dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
 105             "snapc %p seq %lld (%d snaps)\n",
 106             mapping->host, page, page->index,
 107             ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
 108             ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
 109             snapc, snapc->seq, snapc->num_snaps);
 110        spin_unlock(&ci->i_ceph_lock);
 111
 112        /*
 113         * Reference snap context in page->private.  Also set
 114         * PagePrivate so that we get invalidatepage callback.
 115         */
 116        BUG_ON(PagePrivate(page));
 117        page->private = (unsigned long)snapc;
 118        SetPagePrivate(page);
 119
 120        ret = __set_page_dirty_nobuffers(page);
 121        WARN_ON(!PageLocked(page));
 122        WARN_ON(!page->mapping);
 123
 124        return ret;
 125}
 126
 127/*
 128 * If we are truncating the full page (i.e. offset == 0), adjust the
 129 * dirty page counters appropriately.  Only called if there is private
 130 * data on the page.
 131 */
 132static void ceph_invalidatepage(struct page *page, unsigned int offset,
 133                                unsigned int length)
 134{
 135        struct inode *inode;
 136        struct ceph_inode_info *ci;
 137        struct ceph_snap_context *snapc = page_snap_context(page);
 138
 139        inode = page->mapping->host;
 140        ci = ceph_inode(inode);
 141
 142        if (offset != 0 || length != PAGE_CACHE_SIZE) {
 143                dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
 144                     inode, page, page->index, offset, length);
 145                return;
 146        }
 147
 148        ceph_invalidate_fscache_page(inode, page);
 149
 150        if (!PagePrivate(page))
 151                return;
 152
 153        /*
 154         * We can get non-dirty pages here due to races between
 155         * set_page_dirty and truncate_complete_page; just spit out a
 156         * warning, in case we end up with accounting problems later.
 157         */
 158        if (!PageDirty(page))
 159                pr_err("%p invalidatepage %p page not dirty\n", inode, page);
 160
 161        ClearPageChecked(page);
 162
 163        dout("%p invalidatepage %p idx %lu full dirty page\n",
 164             inode, page, page->index);
 165
 166        ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 167        ceph_put_snap_context(snapc);
 168        page->private = 0;
 169        ClearPagePrivate(page);
 170}
 171
 172static int ceph_releasepage(struct page *page, gfp_t g)
 173{
 174        struct inode *inode = page->mapping ? page->mapping->host : NULL;
 175        dout("%p releasepage %p idx %lu\n", inode, page, page->index);
 176        WARN_ON(PageDirty(page));
 177
 178        /* Can we release the page from the cache? */
 179        if (!ceph_release_fscache_page(page, g))
 180                return 0;
 181
 182        return !PagePrivate(page);
 183}
 184
 185/*
 186 * read a single page, without unlocking it.
 187 */
 188static int readpage_nounlock(struct file *filp, struct page *page)
 189{
 190        struct inode *inode = file_inode(filp);
 191        struct ceph_inode_info *ci = ceph_inode(inode);
 192        struct ceph_osd_client *osdc =
 193                &ceph_inode_to_client(inode)->client->osdc;
 194        int err = 0;
 195        u64 len = PAGE_CACHE_SIZE;
 196
 197        err = ceph_readpage_from_fscache(inode, page);
 198
 199        if (err == 0)
 200                goto out;
 201
 202        dout("readpage inode %p file %p page %p index %lu\n",
 203             inode, filp, page, page->index);
 204        err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
 205                                  (u64) page_offset(page), &len,
 206                                  ci->i_truncate_seq, ci->i_truncate_size,
 207                                  &page, 1, 0);
 208        if (err == -ENOENT)
 209                err = 0;
 210        if (err < 0) {
 211                SetPageError(page);
 212                goto out;
 213        } else {
 214                if (err < PAGE_CACHE_SIZE) {
 215                /* zero fill remainder of page */
 216                        zero_user_segment(page, err, PAGE_CACHE_SIZE);
 217                } else {
 218                        flush_dcache_page(page);
 219                }
 220        }
 221        SetPageUptodate(page);
 222
 223        if (err >= 0)
 224                ceph_readpage_to_fscache(inode, page);
 225
 226out:
 227        return err < 0 ? err : 0;
 228}
 229
 230static int ceph_readpage(struct file *filp, struct page *page)
 231{
 232        int r = readpage_nounlock(filp, page);
 233        unlock_page(page);
 234        return r;
 235}
 236
 237/*
 238 * Finish an async read(ahead) op.
 239 */
 240static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 241{
 242        struct inode *inode = req->r_inode;
 243        struct ceph_osd_data *osd_data;
 244        int rc = req->r_result;
 245        int bytes = le32_to_cpu(msg->hdr.data_len);
 246        int num_pages;
 247        int i;
 248
 249        dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
 250
 251        /* unlock all pages, zeroing any data we didn't read */
 252        osd_data = osd_req_op_extent_osd_data(req, 0);
 253        BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 254        num_pages = calc_pages_for((u64)osd_data->alignment,
 255                                        (u64)osd_data->length);
 256        for (i = 0; i < num_pages; i++) {
 257                struct page *page = osd_data->pages[i];
 258
 259                if (bytes < (int)PAGE_CACHE_SIZE) {
 260                        /* zero (remainder of) page */
 261                        int s = bytes < 0 ? 0 : bytes;
 262                        zero_user_segment(page, s, PAGE_CACHE_SIZE);
 263                }
 264                dout("finish_read %p uptodate %p idx %lu\n", inode, page,
 265                     page->index);
 266                flush_dcache_page(page);
 267                SetPageUptodate(page);
 268                ceph_readpage_to_fscache(inode, page);
 269                unlock_page(page);
 270                page_cache_release(page);
 271                bytes -= PAGE_CACHE_SIZE;
 272        }
 273        kfree(osd_data->pages);
 274}
 275
 276static void ceph_unlock_page_vector(struct page **pages, int num_pages)
 277{
 278        int i;
 279
 280        for (i = 0; i < num_pages; i++)
 281                unlock_page(pages[i]);
 282}
 283
 284/*
 285 * start an async read(ahead) operation.  return nr_pages we submitted
 286 * a read for on success, or negative error code.
 287 */
 288static int start_read(struct inode *inode, struct list_head *page_list, int max)
 289{
 290        struct ceph_osd_client *osdc =
 291                &ceph_inode_to_client(inode)->client->osdc;
 292        struct ceph_inode_info *ci = ceph_inode(inode);
 293        struct page *page = list_entry(page_list->prev, struct page, lru);
 294        struct ceph_vino vino;
 295        struct ceph_osd_request *req;
 296        u64 off;
 297        u64 len;
 298        int i;
 299        struct page **pages;
 300        pgoff_t next_index;
 301        int nr_pages = 0;
 302        int ret;
 303
 304        off = (u64) page_offset(page);
 305
 306        /* count pages */
 307        next_index = page->index;
 308        list_for_each_entry_reverse(page, page_list, lru) {
 309                if (page->index != next_index)
 310                        break;
 311                nr_pages++;
 312                next_index++;
 313                if (max && nr_pages == max)
 314                        break;
 315        }
 316        len = nr_pages << PAGE_CACHE_SHIFT;
 317        dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
 318             off, len);
 319        vino = ceph_vino(inode);
 320        req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
 321                                    1, CEPH_OSD_OP_READ,
 322                                    CEPH_OSD_FLAG_READ, NULL,
 323                                    ci->i_truncate_seq, ci->i_truncate_size,
 324                                    false);
 325        if (IS_ERR(req))
 326                return PTR_ERR(req);
 327
 328        /* build page vector */
 329        nr_pages = calc_pages_for(0, len);
 330        pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
 331        ret = -ENOMEM;
 332        if (!pages)
 333                goto out;
 334        for (i = 0; i < nr_pages; ++i) {
 335                page = list_entry(page_list->prev, struct page, lru);
 336                BUG_ON(PageLocked(page));
 337                list_del(&page->lru);
 338
 339                dout("start_read %p adding %p idx %lu\n", inode, page,
 340                     page->index);
 341                if (add_to_page_cache_lru(page, &inode->i_data, page->index,
 342                                          GFP_NOFS)) {
 343                        ceph_fscache_uncache_page(inode, page);
 344                        page_cache_release(page);
 345                        dout("start_read %p add_to_page_cache failed %p\n",
 346                             inode, page);
 347                        nr_pages = i;
 348                        goto out_pages;
 349                }
 350                pages[i] = page;
 351        }
 352        osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
 353        req->r_callback = finish_read;
 354        req->r_inode = inode;
 355
 356        ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
 357
 358        dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
 359        ret = ceph_osdc_start_request(osdc, req, false);
 360        if (ret < 0)
 361                goto out_pages;
 362        ceph_osdc_put_request(req);
 363        return nr_pages;
 364
 365out_pages:
 366        ceph_unlock_page_vector(pages, nr_pages);
 367        ceph_release_page_vector(pages, nr_pages);
 368out:
 369        ceph_osdc_put_request(req);
 370        return ret;
 371}
 372
 373
 374/*
 375 * Read multiple pages.  Leave pages we don't read + unlock in page_list;
 376 * the caller (VM) cleans them up.
 377 */
 378static int ceph_readpages(struct file *file, struct address_space *mapping,
 379                          struct list_head *page_list, unsigned nr_pages)
 380{
 381        struct inode *inode = file_inode(file);
 382        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 383        int rc = 0;
 384        int max = 0;
 385
 386        rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
 387                                         &nr_pages);
 388
 389        if (rc == 0)
 390                goto out;
 391
 392        if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
 393                max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
 394                        >> PAGE_SHIFT;
 395
 396        dout("readpages %p file %p nr_pages %d max %d\n", inode,
 397                file, nr_pages,
 398             max);
 399        while (!list_empty(page_list)) {
 400                rc = start_read(inode, page_list, max);
 401                if (rc < 0)
 402                        goto out;
 403                BUG_ON(rc == 0);
 404        }
 405out:
 406        ceph_fscache_readpages_cancel(inode, page_list);
 407
 408        dout("readpages %p file %p ret %d\n", inode, file, rc);
 409        return rc;
 410}
 411
 412/*
 413 * Get ref for the oldest snapc for an inode with dirty data... that is, the
 414 * only snap context we are allowed to write back.
 415 */
 416static struct ceph_snap_context *get_oldest_context(struct inode *inode,
 417                                                    u64 *snap_size)
 418{
 419        struct ceph_inode_info *ci = ceph_inode(inode);
 420        struct ceph_snap_context *snapc = NULL;
 421        struct ceph_cap_snap *capsnap = NULL;
 422
 423        spin_lock(&ci->i_ceph_lock);
 424        list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
 425                dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
 426                     capsnap->context, capsnap->dirty_pages);
 427                if (capsnap->dirty_pages) {
 428                        snapc = ceph_get_snap_context(capsnap->context);
 429                        if (snap_size)
 430                                *snap_size = capsnap->size;
 431                        break;
 432                }
 433        }
 434        if (!snapc && ci->i_wrbuffer_ref_head) {
 435                snapc = ceph_get_snap_context(ci->i_head_snapc);
 436                dout(" head snapc %p has %d dirty pages\n",
 437                     snapc, ci->i_wrbuffer_ref_head);
 438        }
 439        spin_unlock(&ci->i_ceph_lock);
 440        return snapc;
 441}
 442
 443/*
 444 * Write a single page, but leave the page locked.
 445 *
 446 * If we get a write error, set the page error bit, but still adjust the
 447 * dirty page accounting (i.e., page is no longer dirty).
 448 */
 449static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 450{
 451        struct inode *inode;
 452        struct ceph_inode_info *ci;
 453        struct ceph_fs_client *fsc;
 454        struct ceph_osd_client *osdc;
 455        struct ceph_snap_context *snapc, *oldest;
 456        loff_t page_off = page_offset(page);
 457        long writeback_stat;
 458        u64 truncate_size, snap_size = 0;
 459        u32 truncate_seq;
 460        int err = 0, len = PAGE_CACHE_SIZE;
 461
 462        dout("writepage %p idx %lu\n", page, page->index);
 463
 464        if (!page->mapping || !page->mapping->host) {
 465                dout("writepage %p - no mapping\n", page);
 466                return -EFAULT;
 467        }
 468        inode = page->mapping->host;
 469        ci = ceph_inode(inode);
 470        fsc = ceph_inode_to_client(inode);
 471        osdc = &fsc->client->osdc;
 472
 473        /* verify this is a writeable snap context */
 474        snapc = page_snap_context(page);
 475        if (snapc == NULL) {
 476                dout("writepage %p page %p not dirty?\n", inode, page);
 477                goto out;
 478        }
 479        oldest = get_oldest_context(inode, &snap_size);
 480        if (snapc->seq > oldest->seq) {
 481                dout("writepage %p page %p snapc %p not writeable - noop\n",
 482                     inode, page, snapc);
 483                /* we should only noop if called by kswapd */
 484                WARN_ON((current->flags & PF_MEMALLOC) == 0);
 485                ceph_put_snap_context(oldest);
 486                goto out;
 487        }
 488        ceph_put_snap_context(oldest);
 489
 490        spin_lock(&ci->i_ceph_lock);
 491        truncate_seq = ci->i_truncate_seq;
 492        truncate_size = ci->i_truncate_size;
 493        if (!snap_size)
 494                snap_size = i_size_read(inode);
 495        spin_unlock(&ci->i_ceph_lock);
 496
 497        /* is this a partial page at end of file? */
 498        if (page_off >= snap_size) {
 499                dout("%p page eof %llu\n", page, snap_size);
 500                goto out;
 501        }
 502        if (snap_size < page_off + len)
 503                len = snap_size - page_off;
 504
 505        dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
 506             inode, page, page->index, page_off, len, snapc);
 507
 508        writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
 509        if (writeback_stat >
 510            CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
 511                set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
 512
 513        ceph_readpage_to_fscache(inode, page);
 514
 515        set_page_writeback(page);
 516        err = ceph_osdc_writepages(osdc, ceph_vino(inode),
 517                                   &ci->i_layout, snapc,
 518                                   page_off, len,
 519                                   truncate_seq, truncate_size,
 520                                   &inode->i_mtime, &page, 1);
 521        if (err < 0) {
 522                dout("writepage setting page/mapping error %d %p\n", err, page);
 523                SetPageError(page);
 524                mapping_set_error(&inode->i_data, err);
 525                if (wbc)
 526                        wbc->pages_skipped++;
 527        } else {
 528                dout("writepage cleaned page %p\n", page);
 529                err = 0;  /* vfs expects us to return 0 */
 530        }
 531        page->private = 0;
 532        ClearPagePrivate(page);
 533        end_page_writeback(page);
 534        ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 535        ceph_put_snap_context(snapc);  /* page's reference */
 536out:
 537        return err;
 538}
 539
 540static int ceph_writepage(struct page *page, struct writeback_control *wbc)
 541{
 542        int err;
 543        struct inode *inode = page->mapping->host;
 544        BUG_ON(!inode);
 545        ihold(inode);
 546        err = writepage_nounlock(page, wbc);
 547        unlock_page(page);
 548        iput(inode);
 549        return err;
 550}
 551
 552
 553/*
 554 * lame release_pages helper.  release_pages() isn't exported to
 555 * modules.
 556 */
 557static void ceph_release_pages(struct page **pages, int num)
 558{
 559        struct pagevec pvec;
 560        int i;
 561
 562        pagevec_init(&pvec, 0);
 563        for (i = 0; i < num; i++) {
 564                if (pagevec_add(&pvec, pages[i]) == 0)
 565                        pagevec_release(&pvec);
 566        }
 567        pagevec_release(&pvec);
 568}
 569
 570/*
 571 * async writeback completion handler.
 572 *
 573 * If we get an error, set the mapping error bit, but not the individual
 574 * page error bits.
 575 */
 576static void writepages_finish(struct ceph_osd_request *req,
 577                              struct ceph_msg *msg)
 578{
 579        struct inode *inode = req->r_inode;
 580        struct ceph_inode_info *ci = ceph_inode(inode);
 581        struct ceph_osd_data *osd_data;
 582        unsigned wrote;
 583        struct page *page;
 584        int num_pages;
 585        int i;
 586        struct ceph_snap_context *snapc = req->r_snapc;
 587        struct address_space *mapping = inode->i_mapping;
 588        int rc = req->r_result;
 589        u64 bytes = req->r_ops[0].extent.length;
 590        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 591        long writeback_stat;
 592        unsigned issued = ceph_caps_issued(ci);
 593
 594        osd_data = osd_req_op_extent_osd_data(req, 0);
 595        BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 596        num_pages = calc_pages_for((u64)osd_data->alignment,
 597                                        (u64)osd_data->length);
 598        if (rc >= 0) {
 599                /*
 600                 * Assume we wrote the pages we originally sent.  The
 601                 * osd might reply with fewer pages if our writeback
 602                 * raced with a truncation and was adjusted at the osd,
 603                 * so don't believe the reply.
 604                 */
 605                wrote = num_pages;
 606        } else {
 607                wrote = 0;
 608                mapping_set_error(mapping, rc);
 609        }
 610        dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
 611             inode, rc, bytes, wrote);
 612
 613        /* clean all pages */
 614        for (i = 0; i < num_pages; i++) {
 615                page = osd_data->pages[i];
 616                BUG_ON(!page);
 617                WARN_ON(!PageUptodate(page));
 618
 619                writeback_stat =
 620                        atomic_long_dec_return(&fsc->writeback_count);
 621                if (writeback_stat <
 622                    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
 623                        clear_bdi_congested(&fsc->backing_dev_info,
 624                                            BLK_RW_ASYNC);
 625
 626                ceph_put_snap_context(page_snap_context(page));
 627                page->private = 0;
 628                ClearPagePrivate(page);
 629                dout("unlocking %d %p\n", i, page);
 630                end_page_writeback(page);
 631
 632                /*
 633                 * We lost the cache cap, need to truncate the page before
 634                 * it is unlocked, otherwise we'd truncate it later in the
 635                 * page truncation thread, possibly losing some data that
 636                 * raced its way in
 637                 */
 638                if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
 639                        generic_error_remove_page(inode->i_mapping, page);
 640
 641                unlock_page(page);
 642        }
 643        dout("%p wrote+cleaned %d pages\n", inode, wrote);
 644        ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
 645
 646        ceph_release_pages(osd_data->pages, num_pages);
 647        if (osd_data->pages_from_pool)
 648                mempool_free(osd_data->pages,
 649                             ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
 650        else
 651                kfree(osd_data->pages);
 652        ceph_osdc_put_request(req);
 653}
 654
 655/*
 656 * initiate async writeback
 657 */
 658static int ceph_writepages_start(struct address_space *mapping,
 659                                 struct writeback_control *wbc)
 660{
 661        struct inode *inode = mapping->host;
 662        struct ceph_inode_info *ci = ceph_inode(inode);
 663        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 664        struct ceph_vino vino = ceph_vino(inode);
 665        pgoff_t index, start, end;
 666        int range_whole = 0;
 667        int should_loop = 1;
 668        pgoff_t max_pages = 0, max_pages_ever = 0;
 669        struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
 670        struct pagevec pvec;
 671        int done = 0;
 672        int rc = 0;
 673        unsigned wsize = 1 << inode->i_blkbits;
 674        struct ceph_osd_request *req = NULL;
 675        int do_sync;
 676        u64 truncate_size, snap_size;
 677        u32 truncate_seq;
 678
 679        /*
 680         * Include a 'sync' in the OSD request if this is a data
 681         * integrity write (e.g., O_SYNC write or fsync()), or if our
 682         * cap is being revoked.
 683         */
 684        if ((wbc->sync_mode == WB_SYNC_ALL) ||
 685                ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
 686                do_sync = 1;
 687        dout("writepages_start %p dosync=%d (mode=%s)\n",
 688             inode, do_sync,
 689             wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
 690             (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 691
 692        if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
 693                pr_warning("writepage_start %p on forced umount\n", inode);
 694                return -EIO; /* we're in a forced umount, don't write! */
 695        }
 696        if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
 697                wsize = fsc->mount_options->wsize;
 698        if (wsize < PAGE_CACHE_SIZE)
 699                wsize = PAGE_CACHE_SIZE;
 700        max_pages_ever = wsize >> PAGE_CACHE_SHIFT;
 701
 702        pagevec_init(&pvec, 0);
 703
 704        /* where to start/end? */
 705        if (wbc->range_cyclic) {
 706                start = mapping->writeback_index; /* Start from prev offset */
 707                end = -1;
 708                dout(" cyclic, start at %lu\n", start);
 709        } else {
 710                start = wbc->range_start >> PAGE_CACHE_SHIFT;
 711                end = wbc->range_end >> PAGE_CACHE_SHIFT;
 712                if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
 713                        range_whole = 1;
 714                should_loop = 0;
 715                dout(" not cyclic, %lu to %lu\n", start, end);
 716        }
 717        index = start;
 718
 719retry:
 720        /* find oldest snap context with dirty data */
 721        ceph_put_snap_context(snapc);
 722        snap_size = 0;
 723        snapc = get_oldest_context(inode, &snap_size);
 724        if (!snapc) {
 725                /* hmm, why does writepages get called when there
 726                   is no dirty data? */
 727                dout(" no snap context with dirty data?\n");
 728                goto out;
 729        }
 730        if (snap_size == 0)
 731                snap_size = i_size_read(inode);
 732        dout(" oldest snapc is %p seq %lld (%d snaps)\n",
 733             snapc, snapc->seq, snapc->num_snaps);
 734
 735        spin_lock(&ci->i_ceph_lock);
 736        truncate_seq = ci->i_truncate_seq;
 737        truncate_size = ci->i_truncate_size;
 738        if (!snap_size)
 739                snap_size = i_size_read(inode);
 740        spin_unlock(&ci->i_ceph_lock);
 741
 742        if (last_snapc && snapc != last_snapc) {
 743                /* if we switched to a newer snapc, restart our scan at the
 744                 * start of the original file range. */
 745                dout("  snapc differs from last pass, restarting at %lu\n",
 746                     index);
 747                index = start;
 748        }
 749        last_snapc = snapc;
 750
 751        while (!done && index <= end) {
 752                int num_ops = do_sync ? 2 : 1;
 753                unsigned i;
 754                int first;
 755                pgoff_t next;
 756                int pvec_pages, locked_pages;
 757                struct page **pages = NULL;
 758                mempool_t *pool = NULL; /* Becomes non-null if mempool used */
 759                struct page *page;
 760                int want;
 761                u64 offset, len;
 762                long writeback_stat;
 763
 764                next = 0;
 765                locked_pages = 0;
 766                max_pages = max_pages_ever;
 767
 768get_more_pages:
 769                first = -1;
 770                want = min(end - index,
 771                           min((pgoff_t)PAGEVEC_SIZE,
 772                               max_pages - (pgoff_t)locked_pages) - 1)
 773                        + 1;
 774                pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
 775                                                PAGECACHE_TAG_DIRTY,
 776                                                want);
 777                dout("pagevec_lookup_tag got %d\n", pvec_pages);
 778                if (!pvec_pages && !locked_pages)
 779                        break;
 780                for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
 781                        page = pvec.pages[i];
 782                        dout("? %p idx %lu\n", page, page->index);
 783                        if (locked_pages == 0)
 784                                lock_page(page);  /* first page */
 785                        else if (!trylock_page(page))
 786                                break;
 787
 788                        /* only dirty pages, or our accounting breaks */
 789                        if (unlikely(!PageDirty(page)) ||
 790                            unlikely(page->mapping != mapping)) {
 791                                dout("!dirty or !mapping %p\n", page);
 792                                unlock_page(page);
 793                                break;
 794                        }
 795                        if (!wbc->range_cyclic && page->index > end) {
 796                                dout("end of range %p\n", page);
 797                                done = 1;
 798                                unlock_page(page);
 799                                break;
 800                        }
 801                        if (next && (page->index != next)) {
 802                                dout("not consecutive %p\n", page);
 803                                unlock_page(page);
 804                                break;
 805                        }
 806                        if (wbc->sync_mode != WB_SYNC_NONE) {
 807                                dout("waiting on writeback %p\n", page);
 808                                wait_on_page_writeback(page);
 809                        }
 810                        if (page_offset(page) >= snap_size) {
 811                                dout("%p page eof %llu\n", page, snap_size);
 812                                done = 1;
 813                                unlock_page(page);
 814                                break;
 815                        }
 816                        if (PageWriteback(page)) {
 817                                dout("%p under writeback\n", page);
 818                                unlock_page(page);
 819                                break;
 820                        }
 821
 822                        /* only if matching snap context */
 823                        pgsnapc = page_snap_context(page);
 824                        if (pgsnapc->seq > snapc->seq) {
 825                                dout("page snapc %p %lld > oldest %p %lld\n",
 826                                     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
 827                                unlock_page(page);
 828                                if (!locked_pages)
 829                                        continue; /* keep looking for snap */
 830                                break;
 831                        }
 832
 833                        if (!clear_page_dirty_for_io(page)) {
 834                                dout("%p !clear_page_dirty_for_io\n", page);
 835                                unlock_page(page);
 836                                break;
 837                        }
 838
 839                        /*
 840                         * We have something to write.  If this is
 841                         * the first locked page this time through,
 842                         * allocate an osd request and a page array
 843                         * that it will use.
 844                         */
 845                        if (locked_pages == 0) {
 846                                BUG_ON(pages);
 847                                /* prepare async write request */
 848                                offset = (u64)page_offset(page);
 849                                len = wsize;
 850                                req = ceph_osdc_new_request(&fsc->client->osdc,
 851                                                        &ci->i_layout, vino,
 852                                                        offset, &len, num_ops,
 853                                                        CEPH_OSD_OP_WRITE,
 854                                                        CEPH_OSD_FLAG_WRITE |
 855                                                        CEPH_OSD_FLAG_ONDISK,
 856                                                        snapc, truncate_seq,
 857                                                        truncate_size, true);
 858                                if (IS_ERR(req)) {
 859                                        rc = PTR_ERR(req);
 860                                        unlock_page(page);
 861                                        break;
 862                                }
 863
 864                                req->r_callback = writepages_finish;
 865                                req->r_inode = inode;
 866
 867                                max_pages = calc_pages_for(0, (u64)len);
 868                                pages = kmalloc(max_pages * sizeof (*pages),
 869                                                GFP_NOFS);
 870                                if (!pages) {
 871                                        pool = fsc->wb_pagevec_pool;
 872                                        pages = mempool_alloc(pool, GFP_NOFS);
 873                                        BUG_ON(!pages);
 874                                }
 875                        }
 876
 877                        /* note position of first page in pvec */
 878                        if (first < 0)
 879                                first = i;
 880                        dout("%p will write page %p idx %lu\n",
 881                             inode, page, page->index);
 882
 883                        writeback_stat =
 884                               atomic_long_inc_return(&fsc->writeback_count);
 885                        if (writeback_stat > CONGESTION_ON_THRESH(
 886                                    fsc->mount_options->congestion_kb)) {
 887                                set_bdi_congested(&fsc->backing_dev_info,
 888                                                  BLK_RW_ASYNC);
 889                        }
 890
 891                        set_page_writeback(page);
 892                        pages[locked_pages] = page;
 893                        locked_pages++;
 894                        next = page->index + 1;
 895                }
 896
 897                /* did we get anything? */
 898                if (!locked_pages)
 899                        goto release_pvec_pages;
 900                if (i) {
 901                        int j;
 902                        BUG_ON(!locked_pages || first < 0);
 903
 904                        if (pvec_pages && i == pvec_pages &&
 905                            locked_pages < max_pages) {
 906                                dout("reached end pvec, trying for more\n");
 907                                pagevec_reinit(&pvec);
 908                                goto get_more_pages;
 909                        }
 910
 911                        /* shift unused pages over in the pvec...  we
 912                         * will need to release them below. */
 913                        for (j = i; j < pvec_pages; j++) {
 914                                dout(" pvec leftover page %p\n",
 915                                     pvec.pages[j]);
 916                                pvec.pages[j-i+first] = pvec.pages[j];
 917                        }
 918                        pvec.nr -= i-first;
 919                }
 920
 921                /* Format the osd request message and submit the write */
 922
 923                offset = page_offset(pages[0]);
 924                len = min(snap_size - offset,
 925                          (u64)locked_pages << PAGE_CACHE_SHIFT);
 926                dout("writepages got %d pages at %llu~%llu\n",
 927                     locked_pages, offset, len);
 928
 929                osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
 930                                                        !!pool, false);
 931
 932                pages = NULL;   /* request message now owns the pages array */
 933                pool = NULL;
 934
 935                /* Update the write op length in case we changed it */
 936
 937                osd_req_op_extent_update(req, 0, len);
 938
 939                vino = ceph_vino(inode);
 940                ceph_osdc_build_request(req, offset, snapc, vino.snap,
 941                                        &inode->i_mtime);
 942
 943                rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
 944                BUG_ON(rc);
 945                req = NULL;
 946
 947                /* continue? */
 948                index = next;
 949                wbc->nr_to_write -= locked_pages;
 950                if (wbc->nr_to_write <= 0)
 951                        done = 1;
 952
 953release_pvec_pages:
 954                dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
 955                     pvec.nr ? pvec.pages[0] : NULL);
 956                pagevec_release(&pvec);
 957
 958                if (locked_pages && !done)
 959                        goto retry;
 960        }
 961
 962        if (should_loop && !done) {
 963                /* more to do; loop back to beginning of file */
 964                dout("writepages looping back to beginning of file\n");
 965                should_loop = 0;
 966                index = 0;
 967                goto retry;
 968        }
 969
 970        if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
 971                mapping->writeback_index = index;
 972
 973out:
 974        if (req)
 975                ceph_osdc_put_request(req);
 976        ceph_put_snap_context(snapc);
 977        dout("writepages done, rc = %d\n", rc);
 978        return rc;
 979}
 980
 981
 982
 983/*
 984 * See if a given @snapc is either writeable, or already written.
 985 */
 986static int context_is_writeable_or_written(struct inode *inode,
 987                                           struct ceph_snap_context *snapc)
 988{
 989        struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
 990        int ret = !oldest || snapc->seq <= oldest->seq;
 991
 992        ceph_put_snap_context(oldest);
 993        return ret;
 994}
 995
 996/*
 997 * We are only allowed to write into/dirty the page if the page is
 998 * clean, or already dirty within the same snap context.
 999 *
1000 * called with page locked.
1001 * return success with page locked,
1002 * or any failure (incl -EAGAIN) with page unlocked.
1003 */
1004static int ceph_update_writeable_page(struct file *file,
1005                            loff_t pos, unsigned len,
1006                            struct page *page)
1007{
1008        struct inode *inode = file_inode(file);
1009        struct ceph_inode_info *ci = ceph_inode(inode);
1010        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
1011        loff_t page_off = pos & PAGE_CACHE_MASK;
1012        int pos_in_page = pos & ~PAGE_CACHE_MASK;
1013        int end_in_page = pos_in_page + len;
1014        loff_t i_size;
1015        int r;
1016        struct ceph_snap_context *snapc, *oldest;
1017
1018retry_locked:
1019        /* writepages currently holds page lock, but if we change that later, */
1020        wait_on_page_writeback(page);
1021
1022        /* check snap context */
1023        BUG_ON(!ci->i_snap_realm);
1024        down_read(&mdsc->snap_rwsem);
1025        BUG_ON(!ci->i_snap_realm->cached_context);
1026        snapc = page_snap_context(page);
1027        if (snapc && snapc != ci->i_head_snapc) {
1028                /*
1029                 * this page is already dirty in another (older) snap
1030                 * context!  is it writeable now?
1031                 */
1032                oldest = get_oldest_context(inode, NULL);
1033                up_read(&mdsc->snap_rwsem);
1034
1035                if (snapc->seq > oldest->seq) {
1036                        ceph_put_snap_context(oldest);
1037                        dout(" page %p snapc %p not current or oldest\n",
1038                             page, snapc);
1039                        /*
1040                         * queue for writeback, and wait for snapc to
1041                         * be writeable or written
1042                         */
1043                        snapc = ceph_get_snap_context(snapc);
1044                        unlock_page(page);
1045                        ceph_queue_writeback(inode);
1046                        r = wait_event_interruptible(ci->i_cap_wq,
1047                               context_is_writeable_or_written(inode, snapc));
1048                        ceph_put_snap_context(snapc);
1049                        if (r == -ERESTARTSYS)
1050                                return r;
1051                        return -EAGAIN;
1052                }
1053                ceph_put_snap_context(oldest);
1054
1055                /* yay, writeable, do it now (without dropping page lock) */
1056                dout(" page %p snapc %p not current, but oldest\n",
1057                     page, snapc);
1058                if (!clear_page_dirty_for_io(page))
1059                        goto retry_locked;
1060                r = writepage_nounlock(page, NULL);
1061                if (r < 0)
1062                        goto fail_nosnap;
1063                goto retry_locked;
1064        }
1065
1066        if (PageUptodate(page)) {
1067                dout(" page %p already uptodate\n", page);
1068                return 0;
1069        }
1070
1071        /* full page? */
1072        if (pos_in_page == 0 && len == PAGE_CACHE_SIZE)
1073                return 0;
1074
1075        /* past end of file? */
1076        i_size = inode->i_size;   /* caller holds i_mutex */
1077
1078        if (i_size + len > inode->i_sb->s_maxbytes) {
1079                /* file is too big */
1080                r = -EINVAL;
1081                goto fail;
1082        }
1083
1084        if (page_off >= i_size ||
1085            (pos_in_page == 0 && (pos+len) >= i_size &&
1086             end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
1087                dout(" zeroing %p 0 - %d and %d - %d\n",
1088                     page, pos_in_page, end_in_page, (int)PAGE_CACHE_SIZE);
1089                zero_user_segments(page,
1090                                   0, pos_in_page,
1091                                   end_in_page, PAGE_CACHE_SIZE);
1092                return 0;
1093        }
1094
1095        /* we need to read it. */
1096        up_read(&mdsc->snap_rwsem);
1097        r = readpage_nounlock(file, page);
1098        if (r < 0)
1099                goto fail_nosnap;
1100        goto retry_locked;
1101
1102fail:
1103        up_read(&mdsc->snap_rwsem);
1104fail_nosnap:
1105        unlock_page(page);
1106        return r;
1107}
1108
1109/*
1110 * We are only allowed to write into/dirty the page if the page is
1111 * clean, or already dirty within the same snap context.
1112 */
1113static int ceph_write_begin(struct file *file, struct address_space *mapping,
1114                            loff_t pos, unsigned len, unsigned flags,
1115                            struct page **pagep, void **fsdata)
1116{
1117        struct inode *inode = file_inode(file);
1118        struct page *page;
1119        pgoff_t index = pos >> PAGE_CACHE_SHIFT;
1120        int r;
1121
1122        do {
1123                /* get a page */
1124                page = grab_cache_page_write_begin(mapping, index, 0);
1125                if (!page)
1126                        return -ENOMEM;
1127                *pagep = page;
1128
1129                dout("write_begin file %p inode %p page %p %d~%d\n", file,
1130                     inode, page, (int)pos, (int)len);
1131
1132                r = ceph_update_writeable_page(file, pos, len, page);
1133        } while (r == -EAGAIN);
1134
1135        return r;
1136}
1137
1138/*
1139 * we don't do anything in here that simple_write_end doesn't do
1140 * except adjust dirty page accounting and drop read lock on
1141 * mdsc->snap_rwsem.
1142 */
1143static int ceph_write_end(struct file *file, struct address_space *mapping,
1144                          loff_t pos, unsigned len, unsigned copied,
1145                          struct page *page, void *fsdata)
1146{
1147        struct inode *inode = file_inode(file);
1148        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1149        struct ceph_mds_client *mdsc = fsc->mdsc;
1150        unsigned from = pos & (PAGE_CACHE_SIZE - 1);
1151        int check_cap = 0;
1152
1153        dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1154             inode, page, (int)pos, (int)copied, (int)len);
1155
1156        /* zero the stale part of the page if we did a short copy */
1157        if (copied < len)
1158                zero_user_segment(page, from+copied, len);
1159
1160        /* did file size increase? */
1161        /* (no need for i_size_read(); we caller holds i_mutex */
1162        if (pos+copied > inode->i_size)
1163                check_cap = ceph_inode_set_size(inode, pos+copied);
1164
1165        if (!PageUptodate(page))
1166                SetPageUptodate(page);
1167
1168        set_page_dirty(page);
1169
1170        unlock_page(page);
1171        up_read(&mdsc->snap_rwsem);
1172        page_cache_release(page);
1173
1174        if (check_cap)
1175                ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1176
1177        return copied;
1178}
1179
1180/*
1181 * we set .direct_IO to indicate direct io is supported, but since we
1182 * intercept O_DIRECT reads and writes early, this function should
1183 * never get called.
1184 */
1185static ssize_t ceph_direct_io(int rw, struct kiocb *iocb,
1186                              const struct iovec *iov,
1187                              loff_t pos, unsigned long nr_segs)
1188{
1189        WARN_ON(1);
1190        return -EINVAL;
1191}
1192
1193const struct address_space_operations ceph_aops = {
1194        .readpage = ceph_readpage,
1195        .readpages = ceph_readpages,
1196        .writepage = ceph_writepage,
1197        .writepages = ceph_writepages_start,
1198        .write_begin = ceph_write_begin,
1199        .write_end = ceph_write_end,
1200        .set_page_dirty = ceph_set_page_dirty,
1201        .invalidatepage = ceph_invalidatepage,
1202        .releasepage = ceph_releasepage,
1203        .direct_IO = ceph_direct_io,
1204};
1205
1206
1207/*
1208 * vm ops
1209 */
1210
1211/*
1212 * Reuse write_begin here for simplicity.
1213 */
1214static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1215{
1216        struct inode *inode = file_inode(vma->vm_file);
1217        struct page *page = vmf->page;
1218        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
1219        loff_t off = page_offset(page);
1220        loff_t size, len;
1221        int ret;
1222
1223        /* Update time before taking page lock */
1224        file_update_time(vma->vm_file);
1225
1226        size = i_size_read(inode);
1227        if (off + PAGE_CACHE_SIZE <= size)
1228                len = PAGE_CACHE_SIZE;
1229        else
1230                len = size & ~PAGE_CACHE_MASK;
1231
1232        dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode,
1233             off, len, page, page->index);
1234
1235        lock_page(page);
1236
1237        ret = VM_FAULT_NOPAGE;
1238        if ((off > size) ||
1239            (page->mapping != inode->i_mapping))
1240                goto out;
1241
1242        ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
1243        if (ret == 0) {
1244                /* success.  we'll keep the page locked. */
1245                set_page_dirty(page);
1246                up_read(&mdsc->snap_rwsem);
1247                ret = VM_FAULT_LOCKED;
1248        } else {
1249                if (ret == -ENOMEM)
1250                        ret = VM_FAULT_OOM;
1251                else
1252                        ret = VM_FAULT_SIGBUS;
1253        }
1254out:
1255        dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret);
1256        if (ret != VM_FAULT_LOCKED)
1257                unlock_page(page);
1258        return ret;
1259}
1260
1261static struct vm_operations_struct ceph_vmops = {
1262        .fault          = filemap_fault,
1263        .page_mkwrite   = ceph_page_mkwrite,
1264        .remap_pages    = generic_file_remap_pages,
1265};
1266
1267int ceph_mmap(struct file *file, struct vm_area_struct *vma)
1268{
1269        struct address_space *mapping = file->f_mapping;
1270
1271        if (!mapping->a_ops->readpage)
1272                return -ENOEXEC;
1273        file_accessed(file);
1274        vma->vm_ops = &ceph_vmops;
1275        return 0;
1276}
1277