linux/drivers/block/rbd.c
<<
>>
Prefs
   1
   2/*
   3   rbd.c -- Export ceph rados objects as a Linux block device
   4
   5
   6   based on drivers/block/osdblk.c:
   7
   8   Copyright 2009 Red Hat, Inc.
   9
  10   This program is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation.
  13
  14   This program is distributed in the hope that it will be useful,
  15   but WITHOUT ANY WARRANTY; without even the implied warranty of
  16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  17   GNU General Public License for more details.
  18
  19   You should have received a copy of the GNU General Public License
  20   along with this program; see the file COPYING.  If not, write to
  21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  22
  23
  24
  25   For usage instructions, please refer to:
  26
  27                 Documentation/ABI/testing/sysfs-bus-rbd
  28
  29 */
  30
  31#include <linux/ceph/libceph.h>
  32#include <linux/ceph/osd_client.h>
  33#include <linux/ceph/mon_client.h>
  34#include <linux/ceph/cls_lock_client.h>
  35#include <linux/ceph/striper.h>
  36#include <linux/ceph/decode.h>
  37#include <linux/fs_parser.h>
  38#include <linux/bsearch.h>
  39
  40#include <linux/kernel.h>
  41#include <linux/device.h>
  42#include <linux/module.h>
  43#include <linux/blk-mq.h>
  44#include <linux/fs.h>
  45#include <linux/blkdev.h>
  46#include <linux/slab.h>
  47#include <linux/idr.h>
  48#include <linux/workqueue.h>
  49
  50#include "rbd_types.h"
  51
  52#define RBD_DEBUG       /* Activate rbd_assert() calls */
  53
  54/*
  55 * Increment the given counter and return its updated value.
  56 * If the counter is already 0 it will not be incremented.
  57 * If the counter is already at its maximum value returns
  58 * -EINVAL without updating it.
  59 */
  60static int atomic_inc_return_safe(atomic_t *v)
  61{
  62        unsigned int counter;
  63
  64        counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
  65        if (counter <= (unsigned int)INT_MAX)
  66                return (int)counter;
  67
  68        atomic_dec(v);
  69
  70        return -EINVAL;
  71}
  72
  73/* Decrement the counter.  Return the resulting value, or -EINVAL */
  74static int atomic_dec_return_safe(atomic_t *v)
  75{
  76        int counter;
  77
  78        counter = atomic_dec_return(v);
  79        if (counter >= 0)
  80                return counter;
  81
  82        atomic_inc(v);
  83
  84        return -EINVAL;
  85}
  86
  87#define RBD_DRV_NAME "rbd"
  88
  89#define RBD_MINORS_PER_MAJOR            256
  90#define RBD_SINGLE_MAJOR_PART_SHIFT     4
  91
  92#define RBD_MAX_PARENT_CHAIN_LEN        16
  93
  94#define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
  95#define RBD_MAX_SNAP_NAME_LEN   \
  96                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
  97
  98#define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
  99
 100#define RBD_SNAP_HEAD_NAME      "-"
 101
 102#define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
 103
 104/* This allows a single page to hold an image name sent by OSD */
 105#define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
 106#define RBD_IMAGE_ID_LEN_MAX    64
 107
 108#define RBD_OBJ_PREFIX_LEN_MAX  64
 109
 110#define RBD_NOTIFY_TIMEOUT      5       /* seconds */
 111#define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
 112
 113/* Feature bits */
 114
 115#define RBD_FEATURE_LAYERING            (1ULL<<0)
 116#define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
 117#define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
 118#define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
 119#define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
 120#define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
 121#define RBD_FEATURE_DATA_POOL           (1ULL<<7)
 122#define RBD_FEATURE_OPERATIONS          (1ULL<<8)
 123
 124#define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
 125                                 RBD_FEATURE_STRIPINGV2 |       \
 126                                 RBD_FEATURE_EXCLUSIVE_LOCK |   \
 127                                 RBD_FEATURE_OBJECT_MAP |       \
 128                                 RBD_FEATURE_FAST_DIFF |        \
 129                                 RBD_FEATURE_DEEP_FLATTEN |     \
 130                                 RBD_FEATURE_DATA_POOL |        \
 131                                 RBD_FEATURE_OPERATIONS)
 132
 133/* Features supported by this (client software) implementation. */
 134
 135#define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
 136
 137/*
 138 * An RBD device name will be "rbd#", where the "rbd" comes from
 139 * RBD_DRV_NAME above, and # is a unique integer identifier.
 140 */
 141#define DEV_NAME_LEN            32
 142
 143/*
 144 * block device image metadata (in-memory version)
 145 */
 146struct rbd_image_header {
 147        /* These six fields never change for a given rbd image */
 148        char *object_prefix;
 149        __u8 obj_order;
 150        u64 stripe_unit;
 151        u64 stripe_count;
 152        s64 data_pool_id;
 153        u64 features;           /* Might be changeable someday? */
 154
 155        /* The remaining fields need to be updated occasionally */
 156        u64 image_size;
 157        struct ceph_snap_context *snapc;
 158        char *snap_names;       /* format 1 only */
 159        u64 *snap_sizes;        /* format 1 only */
 160};
 161
 162/*
 163 * An rbd image specification.
 164 *
 165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
 166 * identify an image.  Each rbd_dev structure includes a pointer to
 167 * an rbd_spec structure that encapsulates this identity.
 168 *
 169 * Each of the id's in an rbd_spec has an associated name.  For a
 170 * user-mapped image, the names are supplied and the id's associated
 171 * with them are looked up.  For a layered image, a parent image is
 172 * defined by the tuple, and the names are looked up.
 173 *
 174 * An rbd_dev structure contains a parent_spec pointer which is
 175 * non-null if the image it represents is a child in a layered
 176 * image.  This pointer will refer to the rbd_spec structure used
 177 * by the parent rbd_dev for its own identity (i.e., the structure
 178 * is shared between the parent and child).
 179 *
 180 * Since these structures are populated once, during the discovery
 181 * phase of image construction, they are effectively immutable so
 182 * we make no effort to synchronize access to them.
 183 *
 184 * Note that code herein does not assume the image name is known (it
 185 * could be a null pointer).
 186 */
 187struct rbd_spec {
 188        u64             pool_id;
 189        const char      *pool_name;
 190        const char      *pool_ns;       /* NULL if default, never "" */
 191
 192        const char      *image_id;
 193        const char      *image_name;
 194
 195        u64             snap_id;
 196        const char      *snap_name;
 197
 198        struct kref     kref;
 199};
 200
 201/*
 202 * an instance of the client.  multiple devices may share an rbd client.
 203 */
 204struct rbd_client {
 205        struct ceph_client      *client;
 206        struct kref             kref;
 207        struct list_head        node;
 208};
 209
 210struct pending_result {
 211        int                     result;         /* first nonzero result */
 212        int                     num_pending;
 213};
 214
 215struct rbd_img_request;
 216
 217enum obj_request_type {
 218        OBJ_REQUEST_NODATA = 1,
 219        OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
 220        OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
 221        OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
 222};
 223
 224enum obj_operation_type {
 225        OBJ_OP_READ = 1,
 226        OBJ_OP_WRITE,
 227        OBJ_OP_DISCARD,
 228        OBJ_OP_ZEROOUT,
 229};
 230
 231#define RBD_OBJ_FLAG_DELETION                   (1U << 0)
 232#define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
 233#define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
 234#define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
 235#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
 236
 237enum rbd_obj_read_state {
 238        RBD_OBJ_READ_START = 1,
 239        RBD_OBJ_READ_OBJECT,
 240        RBD_OBJ_READ_PARENT,
 241};
 242
 243/*
 244 * Writes go through the following state machine to deal with
 245 * layering:
 246 *
 247 *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
 248 *            .                 |                                    .
 249 *            .                 v                                    .
 250 *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
 251 *            .                 |                    .               .
 252 *            .                 v                    v (deep-copyup  .
 253 *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
 254 * flattened) v                 |                    .               .
 255 *            .                 v                    .               .
 256 *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
 257 *                              |                        not needed) v
 258 *                              v                                    .
 259 *                            done . . . . . . . . . . . . . . . . . .
 260 *                              ^
 261 *                              |
 262 *                     RBD_OBJ_WRITE_FLAT
 263 *
 264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
 265 * assert_exists guard is needed or not (in some cases it's not needed
 266 * even if there is a parent).
 267 */
 268enum rbd_obj_write_state {
 269        RBD_OBJ_WRITE_START = 1,
 270        RBD_OBJ_WRITE_PRE_OBJECT_MAP,
 271        RBD_OBJ_WRITE_OBJECT,
 272        __RBD_OBJ_WRITE_COPYUP,
 273        RBD_OBJ_WRITE_COPYUP,
 274        RBD_OBJ_WRITE_POST_OBJECT_MAP,
 275};
 276
 277enum rbd_obj_copyup_state {
 278        RBD_OBJ_COPYUP_START = 1,
 279        RBD_OBJ_COPYUP_READ_PARENT,
 280        __RBD_OBJ_COPYUP_OBJECT_MAPS,
 281        RBD_OBJ_COPYUP_OBJECT_MAPS,
 282        __RBD_OBJ_COPYUP_WRITE_OBJECT,
 283        RBD_OBJ_COPYUP_WRITE_OBJECT,
 284};
 285
 286struct rbd_obj_request {
 287        struct ceph_object_extent ex;
 288        unsigned int            flags;  /* RBD_OBJ_FLAG_* */
 289        union {
 290                enum rbd_obj_read_state  read_state;    /* for reads */
 291                enum rbd_obj_write_state write_state;   /* for writes */
 292        };
 293
 294        struct rbd_img_request  *img_request;
 295        struct ceph_file_extent *img_extents;
 296        u32                     num_img_extents;
 297
 298        union {
 299                struct ceph_bio_iter    bio_pos;
 300                struct {
 301                        struct ceph_bvec_iter   bvec_pos;
 302                        u32                     bvec_count;
 303                        u32                     bvec_idx;
 304                };
 305        };
 306
 307        enum rbd_obj_copyup_state copyup_state;
 308        struct bio_vec          *copyup_bvecs;
 309        u32                     copyup_bvec_count;
 310
 311        struct list_head        osd_reqs;       /* w/ r_private_item */
 312
 313        struct mutex            state_mutex;
 314        struct pending_result   pending;
 315        struct kref             kref;
 316};
 317
 318enum img_req_flags {
 319        IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
 320        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
 321};
 322
 323enum rbd_img_state {
 324        RBD_IMG_START = 1,
 325        RBD_IMG_EXCLUSIVE_LOCK,
 326        __RBD_IMG_OBJECT_REQUESTS,
 327        RBD_IMG_OBJECT_REQUESTS,
 328};
 329
 330struct rbd_img_request {
 331        struct rbd_device       *rbd_dev;
 332        enum obj_operation_type op_type;
 333        enum obj_request_type   data_type;
 334        unsigned long           flags;
 335        enum rbd_img_state      state;
 336        union {
 337                u64                     snap_id;        /* for reads */
 338                struct ceph_snap_context *snapc;        /* for writes */
 339        };
 340        struct rbd_obj_request  *obj_request;   /* obj req initiator */
 341
 342        struct list_head        lock_item;
 343        struct list_head        object_extents; /* obj_req.ex structs */
 344
 345        struct mutex            state_mutex;
 346        struct pending_result   pending;
 347        struct work_struct      work;
 348        int                     work_result;
 349};
 350
 351#define for_each_obj_request(ireq, oreq) \
 352        list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
 353#define for_each_obj_request_safe(ireq, oreq, n) \
 354        list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
 355
 356enum rbd_watch_state {
 357        RBD_WATCH_STATE_UNREGISTERED,
 358        RBD_WATCH_STATE_REGISTERED,
 359        RBD_WATCH_STATE_ERROR,
 360};
 361
 362enum rbd_lock_state {
 363        RBD_LOCK_STATE_UNLOCKED,
 364        RBD_LOCK_STATE_LOCKED,
 365        RBD_LOCK_STATE_RELEASING,
 366};
 367
 368/* WatchNotify::ClientId */
 369struct rbd_client_id {
 370        u64 gid;
 371        u64 handle;
 372};
 373
 374struct rbd_mapping {
 375        u64                     size;
 376};
 377
 378/*
 379 * a single device
 380 */
 381struct rbd_device {
 382        int                     dev_id;         /* blkdev unique id */
 383
 384        int                     major;          /* blkdev assigned major */
 385        int                     minor;
 386        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 387
 388        u32                     image_format;   /* Either 1 or 2 */
 389        struct rbd_client       *rbd_client;
 390
 391        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 392
 393        spinlock_t              lock;           /* queue, flags, open_count */
 394
 395        struct rbd_image_header header;
 396        unsigned long           flags;          /* possibly lock protected */
 397        struct rbd_spec         *spec;
 398        struct rbd_options      *opts;
 399        char                    *config_info;   /* add{,_single_major} string */
 400
 401        struct ceph_object_id   header_oid;
 402        struct ceph_object_locator header_oloc;
 403
 404        struct ceph_file_layout layout;         /* used for all rbd requests */
 405
 406        struct mutex            watch_mutex;
 407        enum rbd_watch_state    watch_state;
 408        struct ceph_osd_linger_request *watch_handle;
 409        u64                     watch_cookie;
 410        struct delayed_work     watch_dwork;
 411
 412        struct rw_semaphore     lock_rwsem;
 413        enum rbd_lock_state     lock_state;
 414        char                    lock_cookie[32];
 415        struct rbd_client_id    owner_cid;
 416        struct work_struct      acquired_lock_work;
 417        struct work_struct      released_lock_work;
 418        struct delayed_work     lock_dwork;
 419        struct work_struct      unlock_work;
 420        spinlock_t              lock_lists_lock;
 421        struct list_head        acquiring_list;
 422        struct list_head        running_list;
 423        struct completion       acquire_wait;
 424        int                     acquire_err;
 425        struct completion       releasing_wait;
 426
 427        spinlock_t              object_map_lock;
 428        u8                      *object_map;
 429        u64                     object_map_size;        /* in objects */
 430        u64                     object_map_flags;
 431
 432        struct workqueue_struct *task_wq;
 433
 434        struct rbd_spec         *parent_spec;
 435        u64                     parent_overlap;
 436        atomic_t                parent_ref;
 437        struct rbd_device       *parent;
 438
 439        /* Block layer tags. */
 440        struct blk_mq_tag_set   tag_set;
 441
 442        /* protects updating the header */
 443        struct rw_semaphore     header_rwsem;
 444
 445        struct rbd_mapping      mapping;
 446
 447        struct list_head        node;
 448
 449        /* sysfs related */
 450        struct device           dev;
 451        unsigned long           open_count;     /* protected by lock */
 452};
 453
 454/*
 455 * Flag bits for rbd_dev->flags:
 456 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
 457 *   by rbd_dev->lock
 458 */
 459enum rbd_dev_flags {
 460        RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
 461        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 462        RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
 463};
 464
 465static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
 466
 467static LIST_HEAD(rbd_dev_list);    /* devices */
 468static DEFINE_SPINLOCK(rbd_dev_list_lock);
 469
 470static LIST_HEAD(rbd_client_list);              /* clients */
 471static DEFINE_SPINLOCK(rbd_client_list_lock);
 472
 473/* Slab caches for frequently-allocated structures */
 474
 475static struct kmem_cache        *rbd_img_request_cache;
 476static struct kmem_cache        *rbd_obj_request_cache;
 477
 478static int rbd_major;
 479static DEFINE_IDA(rbd_dev_id_ida);
 480
 481static struct workqueue_struct *rbd_wq;
 482
 483static struct ceph_snap_context rbd_empty_snapc = {
 484        .nref = REFCOUNT_INIT(1),
 485};
 486
 487/*
 488 * single-major requires >= 0.75 version of userspace rbd utility.
 489 */
 490static bool single_major = true;
 491module_param(single_major, bool, 0444);
 492MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
 493
 494static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
 495static ssize_t remove_store(struct bus_type *bus, const char *buf,
 496                            size_t count);
 497static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
 498                                      size_t count);
 499static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
 500                                         size_t count);
 501static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
 502
 503static int rbd_dev_id_to_minor(int dev_id)
 504{
 505        return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
 506}
 507
 508static int minor_to_rbd_dev_id(int minor)
 509{
 510        return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 511}
 512
 513static bool rbd_is_ro(struct rbd_device *rbd_dev)
 514{
 515        return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
 516}
 517
 518static bool rbd_is_snap(struct rbd_device *rbd_dev)
 519{
 520        return rbd_dev->spec->snap_id != CEPH_NOSNAP;
 521}
 522
 523static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 524{
 525        lockdep_assert_held(&rbd_dev->lock_rwsem);
 526
 527        return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
 528               rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
 529}
 530
 531static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
 532{
 533        bool is_lock_owner;
 534
 535        down_read(&rbd_dev->lock_rwsem);
 536        is_lock_owner = __rbd_is_lock_owner(rbd_dev);
 537        up_read(&rbd_dev->lock_rwsem);
 538        return is_lock_owner;
 539}
 540
 541static ssize_t supported_features_show(struct bus_type *bus, char *buf)
 542{
 543        return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
 544}
 545
 546static BUS_ATTR_WO(add);
 547static BUS_ATTR_WO(remove);
 548static BUS_ATTR_WO(add_single_major);
 549static BUS_ATTR_WO(remove_single_major);
 550static BUS_ATTR_RO(supported_features);
 551
 552static struct attribute *rbd_bus_attrs[] = {
 553        &bus_attr_add.attr,
 554        &bus_attr_remove.attr,
 555        &bus_attr_add_single_major.attr,
 556        &bus_attr_remove_single_major.attr,
 557        &bus_attr_supported_features.attr,
 558        NULL,
 559};
 560
 561static umode_t rbd_bus_is_visible(struct kobject *kobj,
 562                                  struct attribute *attr, int index)
 563{
 564        if (!single_major &&
 565            (attr == &bus_attr_add_single_major.attr ||
 566             attr == &bus_attr_remove_single_major.attr))
 567                return 0;
 568
 569        return attr->mode;
 570}
 571
 572static const struct attribute_group rbd_bus_group = {
 573        .attrs = rbd_bus_attrs,
 574        .is_visible = rbd_bus_is_visible,
 575};
 576__ATTRIBUTE_GROUPS(rbd_bus);
 577
 578static struct bus_type rbd_bus_type = {
 579        .name           = "rbd",
 580        .bus_groups     = rbd_bus_groups,
 581};
 582
 583static void rbd_root_dev_release(struct device *dev)
 584{
 585}
 586
 587static struct device rbd_root_dev = {
 588        .init_name =    "rbd",
 589        .release =      rbd_root_dev_release,
 590};
 591
 592static __printf(2, 3)
 593void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 594{
 595        struct va_format vaf;
 596        va_list args;
 597
 598        va_start(args, fmt);
 599        vaf.fmt = fmt;
 600        vaf.va = &args;
 601
 602        if (!rbd_dev)
 603                printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
 604        else if (rbd_dev->disk)
 605                printk(KERN_WARNING "%s: %s: %pV\n",
 606                        RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
 607        else if (rbd_dev->spec && rbd_dev->spec->image_name)
 608                printk(KERN_WARNING "%s: image %s: %pV\n",
 609                        RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
 610        else if (rbd_dev->spec && rbd_dev->spec->image_id)
 611                printk(KERN_WARNING "%s: id %s: %pV\n",
 612                        RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
 613        else    /* punt */
 614                printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
 615                        RBD_DRV_NAME, rbd_dev, &vaf);
 616        va_end(args);
 617}
 618
 619#ifdef RBD_DEBUG
 620#define rbd_assert(expr)                                                \
 621                if (unlikely(!(expr))) {                                \
 622                        printk(KERN_ERR "\nAssertion failure in %s() "  \
 623                                                "at line %d:\n\n"       \
 624                                        "\trbd_assert(%s);\n\n",        \
 625                                        __func__, __LINE__, #expr);     \
 626                        BUG();                                          \
 627                }
 628#else /* !RBD_DEBUG */
 629#  define rbd_assert(expr)      ((void) 0)
 630#endif /* !RBD_DEBUG */
 631
 632static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 633
 634static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 635static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
 636static int rbd_dev_header_info(struct rbd_device *rbd_dev);
 637static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
 638static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 639                                        u64 snap_id);
 640static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 641                                u8 *order, u64 *snap_size);
 642static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
 643
 644static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
 645static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
 646
 647/*
 648 * Return true if nothing else is pending.
 649 */
 650static bool pending_result_dec(struct pending_result *pending, int *result)
 651{
 652        rbd_assert(pending->num_pending > 0);
 653
 654        if (*result && !pending->result)
 655                pending->result = *result;
 656        if (--pending->num_pending)
 657                return false;
 658
 659        *result = pending->result;
 660        return true;
 661}
 662
 663static int rbd_open(struct block_device *bdev, fmode_t mode)
 664{
 665        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 666        bool removing = false;
 667
 668        spin_lock_irq(&rbd_dev->lock);
 669        if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
 670                removing = true;
 671        else
 672                rbd_dev->open_count++;
 673        spin_unlock_irq(&rbd_dev->lock);
 674        if (removing)
 675                return -ENOENT;
 676
 677        (void) get_device(&rbd_dev->dev);
 678
 679        return 0;
 680}
 681
 682static void rbd_release(struct gendisk *disk, fmode_t mode)
 683{
 684        struct rbd_device *rbd_dev = disk->private_data;
 685        unsigned long open_count_before;
 686
 687        spin_lock_irq(&rbd_dev->lock);
 688        open_count_before = rbd_dev->open_count--;
 689        spin_unlock_irq(&rbd_dev->lock);
 690        rbd_assert(open_count_before > 0);
 691
 692        put_device(&rbd_dev->dev);
 693}
 694
 695static const struct block_device_operations rbd_bd_ops = {
 696        .owner                  = THIS_MODULE,
 697        .open                   = rbd_open,
 698        .release                = rbd_release,
 699};
 700
 701/*
 702 * Initialize an rbd client instance.  Success or not, this function
 703 * consumes ceph_opts.  Caller holds client_mutex.
 704 */
 705static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 706{
 707        struct rbd_client *rbdc;
 708        int ret = -ENOMEM;
 709
 710        dout("%s:\n", __func__);
 711        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 712        if (!rbdc)
 713                goto out_opt;
 714
 715        kref_init(&rbdc->kref);
 716        INIT_LIST_HEAD(&rbdc->node);
 717
 718        rbdc->client = ceph_create_client(ceph_opts, rbdc);
 719        if (IS_ERR(rbdc->client))
 720                goto out_rbdc;
 721        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 722
 723        ret = ceph_open_session(rbdc->client);
 724        if (ret < 0)
 725                goto out_client;
 726
 727        spin_lock(&rbd_client_list_lock);
 728        list_add_tail(&rbdc->node, &rbd_client_list);
 729        spin_unlock(&rbd_client_list_lock);
 730
 731        dout("%s: rbdc %p\n", __func__, rbdc);
 732
 733        return rbdc;
 734out_client:
 735        ceph_destroy_client(rbdc->client);
 736out_rbdc:
 737        kfree(rbdc);
 738out_opt:
 739        if (ceph_opts)
 740                ceph_destroy_options(ceph_opts);
 741        dout("%s: error %d\n", __func__, ret);
 742
 743        return ERR_PTR(ret);
 744}
 745
 746static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
 747{
 748        kref_get(&rbdc->kref);
 749
 750        return rbdc;
 751}
 752
 753/*
 754 * Find a ceph client with specific addr and configuration.  If
 755 * found, bump its reference count.
 756 */
 757static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 758{
 759        struct rbd_client *client_node;
 760        bool found = false;
 761
 762        if (ceph_opts->flags & CEPH_OPT_NOSHARE)
 763                return NULL;
 764
 765        spin_lock(&rbd_client_list_lock);
 766        list_for_each_entry(client_node, &rbd_client_list, node) {
 767                if (!ceph_compare_options(ceph_opts, client_node->client)) {
 768                        __rbd_get_client(client_node);
 769
 770                        found = true;
 771                        break;
 772                }
 773        }
 774        spin_unlock(&rbd_client_list_lock);
 775
 776        return found ? client_node : NULL;
 777}
 778
 779/*
 780 * (Per device) rbd map options
 781 */
 782enum {
 783        Opt_queue_depth,
 784        Opt_alloc_size,
 785        Opt_lock_timeout,
 786        /* int args above */
 787        Opt_pool_ns,
 788        Opt_compression_hint,
 789        /* string args above */
 790        Opt_read_only,
 791        Opt_read_write,
 792        Opt_lock_on_read,
 793        Opt_exclusive,
 794        Opt_notrim,
 795};
 796
 797enum {
 798        Opt_compression_hint_none,
 799        Opt_compression_hint_compressible,
 800        Opt_compression_hint_incompressible,
 801};
 802
 803static const struct constant_table rbd_param_compression_hint[] = {
 804        {"none",                Opt_compression_hint_none},
 805        {"compressible",        Opt_compression_hint_compressible},
 806        {"incompressible",      Opt_compression_hint_incompressible},
 807        {}
 808};
 809
 810static const struct fs_parameter_spec rbd_parameters[] = {
 811        fsparam_u32     ("alloc_size",                  Opt_alloc_size),
 812        fsparam_enum    ("compression_hint",            Opt_compression_hint,
 813                         rbd_param_compression_hint),
 814        fsparam_flag    ("exclusive",                   Opt_exclusive),
 815        fsparam_flag    ("lock_on_read",                Opt_lock_on_read),
 816        fsparam_u32     ("lock_timeout",                Opt_lock_timeout),
 817        fsparam_flag    ("notrim",                      Opt_notrim),
 818        fsparam_string  ("_pool_ns",                    Opt_pool_ns),
 819        fsparam_u32     ("queue_depth",                 Opt_queue_depth),
 820        fsparam_flag    ("read_only",                   Opt_read_only),
 821        fsparam_flag    ("read_write",                  Opt_read_write),
 822        fsparam_flag    ("ro",                          Opt_read_only),
 823        fsparam_flag    ("rw",                          Opt_read_write),
 824        {}
 825};
 826
 827struct rbd_options {
 828        int     queue_depth;
 829        int     alloc_size;
 830        unsigned long   lock_timeout;
 831        bool    read_only;
 832        bool    lock_on_read;
 833        bool    exclusive;
 834        bool    trim;
 835
 836        u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
 837};
 838
 839#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
 840#define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
 841#define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
 842#define RBD_READ_ONLY_DEFAULT   false
 843#define RBD_LOCK_ON_READ_DEFAULT false
 844#define RBD_EXCLUSIVE_DEFAULT   false
 845#define RBD_TRIM_DEFAULT        true
 846
 847struct rbd_parse_opts_ctx {
 848        struct rbd_spec         *spec;
 849        struct ceph_options     *copts;
 850        struct rbd_options      *opts;
 851};
 852
 853static char* obj_op_name(enum obj_operation_type op_type)
 854{
 855        switch (op_type) {
 856        case OBJ_OP_READ:
 857                return "read";
 858        case OBJ_OP_WRITE:
 859                return "write";
 860        case OBJ_OP_DISCARD:
 861                return "discard";
 862        case OBJ_OP_ZEROOUT:
 863                return "zeroout";
 864        default:
 865                return "???";
 866        }
 867}
 868
 869/*
 870 * Destroy ceph client
 871 *
 872 * Caller must hold rbd_client_list_lock.
 873 */
 874static void rbd_client_release(struct kref *kref)
 875{
 876        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 877
 878        dout("%s: rbdc %p\n", __func__, rbdc);
 879        spin_lock(&rbd_client_list_lock);
 880        list_del(&rbdc->node);
 881        spin_unlock(&rbd_client_list_lock);
 882
 883        ceph_destroy_client(rbdc->client);
 884        kfree(rbdc);
 885}
 886
 887/*
 888 * Drop reference to ceph client node. If it's not referenced anymore, release
 889 * it.
 890 */
 891static void rbd_put_client(struct rbd_client *rbdc)
 892{
 893        if (rbdc)
 894                kref_put(&rbdc->kref, rbd_client_release);
 895}
 896
 897/*
 898 * Get a ceph client with specific addr and configuration, if one does
 899 * not exist create it.  Either way, ceph_opts is consumed by this
 900 * function.
 901 */
 902static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 903{
 904        struct rbd_client *rbdc;
 905        int ret;
 906
 907        mutex_lock(&client_mutex);
 908        rbdc = rbd_client_find(ceph_opts);
 909        if (rbdc) {
 910                ceph_destroy_options(ceph_opts);
 911
 912                /*
 913                 * Using an existing client.  Make sure ->pg_pools is up to
 914                 * date before we look up the pool id in do_rbd_add().
 915                 */
 916                ret = ceph_wait_for_latest_osdmap(rbdc->client,
 917                                        rbdc->client->options->mount_timeout);
 918                if (ret) {
 919                        rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
 920                        rbd_put_client(rbdc);
 921                        rbdc = ERR_PTR(ret);
 922                }
 923        } else {
 924                rbdc = rbd_client_create(ceph_opts);
 925        }
 926        mutex_unlock(&client_mutex);
 927
 928        return rbdc;
 929}
 930
 931static bool rbd_image_format_valid(u32 image_format)
 932{
 933        return image_format == 1 || image_format == 2;
 934}
 935
 936static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 937{
 938        size_t size;
 939        u32 snap_count;
 940
 941        /* The header has to start with the magic rbd header text */
 942        if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
 943                return false;
 944
 945        /* The bio layer requires at least sector-sized I/O */
 946
 947        if (ondisk->options.order < SECTOR_SHIFT)
 948                return false;
 949
 950        /* If we use u64 in a few spots we may be able to loosen this */
 951
 952        if (ondisk->options.order > 8 * sizeof (int) - 1)
 953                return false;
 954
 955        /*
 956         * The size of a snapshot header has to fit in a size_t, and
 957         * that limits the number of snapshots.
 958         */
 959        snap_count = le32_to_cpu(ondisk->snap_count);
 960        size = SIZE_MAX - sizeof (struct ceph_snap_context);
 961        if (snap_count > size / sizeof (__le64))
 962                return false;
 963
 964        /*
 965         * Not only that, but the size of the entire the snapshot
 966         * header must also be representable in a size_t.
 967         */
 968        size -= snap_count * sizeof (__le64);
 969        if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
 970                return false;
 971
 972        return true;
 973}
 974
 975/*
 976 * returns the size of an object in the image
 977 */
 978static u32 rbd_obj_bytes(struct rbd_image_header *header)
 979{
 980        return 1U << header->obj_order;
 981}
 982
 983static void rbd_init_layout(struct rbd_device *rbd_dev)
 984{
 985        if (rbd_dev->header.stripe_unit == 0 ||
 986            rbd_dev->header.stripe_count == 0) {
 987                rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
 988                rbd_dev->header.stripe_count = 1;
 989        }
 990
 991        rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
 992        rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
 993        rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
 994        rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
 995                          rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
 996        RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
 997}
 998
 999/*
1000 * Fill an rbd image header with information from the given format 1
1001 * on-disk header.
1002 */
1003static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1004                                 struct rbd_image_header_ondisk *ondisk)
1005{
1006        struct rbd_image_header *header = &rbd_dev->header;
1007        bool first_time = header->object_prefix == NULL;
1008        struct ceph_snap_context *snapc;
1009        char *object_prefix = NULL;
1010        char *snap_names = NULL;
1011        u64 *snap_sizes = NULL;
1012        u32 snap_count;
1013        int ret = -ENOMEM;
1014        u32 i;
1015
1016        /* Allocate this now to avoid having to handle failure below */
1017
1018        if (first_time) {
1019                object_prefix = kstrndup(ondisk->object_prefix,
1020                                         sizeof(ondisk->object_prefix),
1021                                         GFP_KERNEL);
1022                if (!object_prefix)
1023                        return -ENOMEM;
1024        }
1025
1026        /* Allocate the snapshot context and fill it in */
1027
1028        snap_count = le32_to_cpu(ondisk->snap_count);
1029        snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1030        if (!snapc)
1031                goto out_err;
1032        snapc->seq = le64_to_cpu(ondisk->snap_seq);
1033        if (snap_count) {
1034                struct rbd_image_snap_ondisk *snaps;
1035                u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1036
1037                /* We'll keep a copy of the snapshot names... */
1038
1039                if (snap_names_len > (u64)SIZE_MAX)
1040                        goto out_2big;
1041                snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1042                if (!snap_names)
1043                        goto out_err;
1044
1045                /* ...as well as the array of their sizes. */
1046                snap_sizes = kmalloc_array(snap_count,
1047                                           sizeof(*header->snap_sizes),
1048                                           GFP_KERNEL);
1049                if (!snap_sizes)
1050                        goto out_err;
1051
1052                /*
1053                 * Copy the names, and fill in each snapshot's id
1054                 * and size.
1055                 *
1056                 * Note that rbd_dev_v1_header_info() guarantees the
1057                 * ondisk buffer we're working with has
1058                 * snap_names_len bytes beyond the end of the
1059                 * snapshot id array, this memcpy() is safe.
1060                 */
1061                memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1062                snaps = ondisk->snaps;
1063                for (i = 0; i < snap_count; i++) {
1064                        snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1065                        snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1066                }
1067        }
1068
1069        /* We won't fail any more, fill in the header */
1070
1071        if (first_time) {
1072                header->object_prefix = object_prefix;
1073                header->obj_order = ondisk->options.order;
1074                rbd_init_layout(rbd_dev);
1075        } else {
1076                ceph_put_snap_context(header->snapc);
1077                kfree(header->snap_names);
1078                kfree(header->snap_sizes);
1079        }
1080
1081        /* The remaining fields always get updated (when we refresh) */
1082
1083        header->image_size = le64_to_cpu(ondisk->image_size);
1084        header->snapc = snapc;
1085        header->snap_names = snap_names;
1086        header->snap_sizes = snap_sizes;
1087
1088        return 0;
1089out_2big:
1090        ret = -EIO;
1091out_err:
1092        kfree(snap_sizes);
1093        kfree(snap_names);
1094        ceph_put_snap_context(snapc);
1095        kfree(object_prefix);
1096
1097        return ret;
1098}
1099
1100static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1101{
1102        const char *snap_name;
1103
1104        rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1105
1106        /* Skip over names until we find the one we are looking for */
1107
1108        snap_name = rbd_dev->header.snap_names;
1109        while (which--)
1110                snap_name += strlen(snap_name) + 1;
1111
1112        return kstrdup(snap_name, GFP_KERNEL);
1113}
1114
1115/*
1116 * Snapshot id comparison function for use with qsort()/bsearch().
1117 * Note that result is for snapshots in *descending* order.
1118 */
1119static int snapid_compare_reverse(const void *s1, const void *s2)
1120{
1121        u64 snap_id1 = *(u64 *)s1;
1122        u64 snap_id2 = *(u64 *)s2;
1123
1124        if (snap_id1 < snap_id2)
1125                return 1;
1126        return snap_id1 == snap_id2 ? 0 : -1;
1127}
1128
1129/*
1130 * Search a snapshot context to see if the given snapshot id is
1131 * present.
1132 *
1133 * Returns the position of the snapshot id in the array if it's found,
1134 * or BAD_SNAP_INDEX otherwise.
1135 *
1136 * Note: The snapshot array is in kept sorted (by the osd) in
1137 * reverse order, highest snapshot id first.
1138 */
1139static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1140{
1141        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1142        u64 *found;
1143
1144        found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1145                                sizeof (snap_id), snapid_compare_reverse);
1146
1147        return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1148}
1149
1150static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1151                                        u64 snap_id)
1152{
1153        u32 which;
1154        const char *snap_name;
1155
1156        which = rbd_dev_snap_index(rbd_dev, snap_id);
1157        if (which == BAD_SNAP_INDEX)
1158                return ERR_PTR(-ENOENT);
1159
1160        snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1161        return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1162}
1163
1164static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1165{
1166        if (snap_id == CEPH_NOSNAP)
1167                return RBD_SNAP_HEAD_NAME;
1168
1169        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1170        if (rbd_dev->image_format == 1)
1171                return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1172
1173        return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1174}
1175
1176static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1177                                u64 *snap_size)
1178{
1179        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1180        if (snap_id == CEPH_NOSNAP) {
1181                *snap_size = rbd_dev->header.image_size;
1182        } else if (rbd_dev->image_format == 1) {
1183                u32 which;
1184
1185                which = rbd_dev_snap_index(rbd_dev, snap_id);
1186                if (which == BAD_SNAP_INDEX)
1187                        return -ENOENT;
1188
1189                *snap_size = rbd_dev->header.snap_sizes[which];
1190        } else {
1191                u64 size = 0;
1192                int ret;
1193
1194                ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1195                if (ret)
1196                        return ret;
1197
1198                *snap_size = size;
1199        }
1200        return 0;
1201}
1202
1203static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1204{
1205        u64 snap_id = rbd_dev->spec->snap_id;
1206        u64 size = 0;
1207        int ret;
1208
1209        ret = rbd_snap_size(rbd_dev, snap_id, &size);
1210        if (ret)
1211                return ret;
1212
1213        rbd_dev->mapping.size = size;
1214        return 0;
1215}
1216
1217static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1218{
1219        rbd_dev->mapping.size = 0;
1220}
1221
1222static void zero_bvec(struct bio_vec *bv)
1223{
1224        void *buf;
1225        unsigned long flags;
1226
1227        buf = bvec_kmap_irq(bv, &flags);
1228        memset(buf, 0, bv->bv_len);
1229        flush_dcache_page(bv->bv_page);
1230        bvec_kunmap_irq(buf, &flags);
1231}
1232
1233static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1234{
1235        struct ceph_bio_iter it = *bio_pos;
1236
1237        ceph_bio_iter_advance(&it, off);
1238        ceph_bio_iter_advance_step(&it, bytes, ({
1239                zero_bvec(&bv);
1240        }));
1241}
1242
1243static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1244{
1245        struct ceph_bvec_iter it = *bvec_pos;
1246
1247        ceph_bvec_iter_advance(&it, off);
1248        ceph_bvec_iter_advance_step(&it, bytes, ({
1249                zero_bvec(&bv);
1250        }));
1251}
1252
1253/*
1254 * Zero a range in @obj_req data buffer defined by a bio (list) or
1255 * (private) bio_vec array.
1256 *
1257 * @off is relative to the start of the data buffer.
1258 */
1259static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1260                               u32 bytes)
1261{
1262        dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1263
1264        switch (obj_req->img_request->data_type) {
1265        case OBJ_REQUEST_BIO:
1266                zero_bios(&obj_req->bio_pos, off, bytes);
1267                break;
1268        case OBJ_REQUEST_BVECS:
1269        case OBJ_REQUEST_OWN_BVECS:
1270                zero_bvecs(&obj_req->bvec_pos, off, bytes);
1271                break;
1272        default:
1273                BUG();
1274        }
1275}
1276
1277static void rbd_obj_request_destroy(struct kref *kref);
1278static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1279{
1280        rbd_assert(obj_request != NULL);
1281        dout("%s: obj %p (was %d)\n", __func__, obj_request,
1282                kref_read(&obj_request->kref));
1283        kref_put(&obj_request->kref, rbd_obj_request_destroy);
1284}
1285
1286static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1287                                        struct rbd_obj_request *obj_request)
1288{
1289        rbd_assert(obj_request->img_request == NULL);
1290
1291        /* Image request now owns object's original reference */
1292        obj_request->img_request = img_request;
1293        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1294}
1295
1296static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1297                                        struct rbd_obj_request *obj_request)
1298{
1299        dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1300        list_del(&obj_request->ex.oe_item);
1301        rbd_assert(obj_request->img_request == img_request);
1302        rbd_obj_request_put(obj_request);
1303}
1304
1305static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1306{
1307        struct rbd_obj_request *obj_req = osd_req->r_priv;
1308
1309        dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1310             __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1311             obj_req->ex.oe_off, obj_req->ex.oe_len);
1312        ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1313}
1314
1315/*
1316 * The default/initial value for all image request flags is 0.  Each
1317 * is conditionally set to 1 at image request initialization time
1318 * and currently never change thereafter.
1319 */
1320static void img_request_layered_set(struct rbd_img_request *img_request)
1321{
1322        set_bit(IMG_REQ_LAYERED, &img_request->flags);
1323}
1324
1325static bool img_request_layered_test(struct rbd_img_request *img_request)
1326{
1327        return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1328}
1329
1330static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1331{
1332        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1333
1334        return !obj_req->ex.oe_off &&
1335               obj_req->ex.oe_len == rbd_dev->layout.object_size;
1336}
1337
1338static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1339{
1340        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1341
1342        return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1343                                        rbd_dev->layout.object_size;
1344}
1345
1346/*
1347 * Must be called after rbd_obj_calc_img_extents().
1348 */
1349static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1350{
1351        if (!obj_req->num_img_extents ||
1352            (rbd_obj_is_entire(obj_req) &&
1353             !obj_req->img_request->snapc->num_snaps))
1354                return false;
1355
1356        return true;
1357}
1358
1359static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1360{
1361        return ceph_file_extents_bytes(obj_req->img_extents,
1362                                       obj_req->num_img_extents);
1363}
1364
1365static bool rbd_img_is_write(struct rbd_img_request *img_req)
1366{
1367        switch (img_req->op_type) {
1368        case OBJ_OP_READ:
1369                return false;
1370        case OBJ_OP_WRITE:
1371        case OBJ_OP_DISCARD:
1372        case OBJ_OP_ZEROOUT:
1373                return true;
1374        default:
1375                BUG();
1376        }
1377}
1378
1379static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1380{
1381        struct rbd_obj_request *obj_req = osd_req->r_priv;
1382        int result;
1383
1384        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1385             osd_req->r_result, obj_req);
1386
1387        /*
1388         * Writes aren't allowed to return a data payload.  In some
1389         * guarded write cases (e.g. stat + zero on an empty object)
1390         * a stat response makes it through, but we don't care.
1391         */
1392        if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1393                result = 0;
1394        else
1395                result = osd_req->r_result;
1396
1397        rbd_obj_handle_request(obj_req, result);
1398}
1399
1400static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1401{
1402        struct rbd_obj_request *obj_request = osd_req->r_priv;
1403        struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1404        struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1405
1406        osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1407        osd_req->r_snapid = obj_request->img_request->snap_id;
1408}
1409
1410static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1411{
1412        struct rbd_obj_request *obj_request = osd_req->r_priv;
1413
1414        osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1415        ktime_get_real_ts64(&osd_req->r_mtime);
1416        osd_req->r_data_offset = obj_request->ex.oe_off;
1417}
1418
1419static struct ceph_osd_request *
1420__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1421                          struct ceph_snap_context *snapc, int num_ops)
1422{
1423        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1424        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1425        struct ceph_osd_request *req;
1426        const char *name_format = rbd_dev->image_format == 1 ?
1427                                      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1428        int ret;
1429
1430        req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1431        if (!req)
1432                return ERR_PTR(-ENOMEM);
1433
1434        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1435        req->r_callback = rbd_osd_req_callback;
1436        req->r_priv = obj_req;
1437
1438        /*
1439         * Data objects may be stored in a separate pool, but always in
1440         * the same namespace in that pool as the header in its pool.
1441         */
1442        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1443        req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1444
1445        ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1446                               rbd_dev->header.object_prefix,
1447                               obj_req->ex.oe_objno);
1448        if (ret)
1449                return ERR_PTR(ret);
1450
1451        return req;
1452}
1453
1454static struct ceph_osd_request *
1455rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1456{
1457        return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1458                                         num_ops);
1459}
1460
1461static struct rbd_obj_request *rbd_obj_request_create(void)
1462{
1463        struct rbd_obj_request *obj_request;
1464
1465        obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1466        if (!obj_request)
1467                return NULL;
1468
1469        ceph_object_extent_init(&obj_request->ex);
1470        INIT_LIST_HEAD(&obj_request->osd_reqs);
1471        mutex_init(&obj_request->state_mutex);
1472        kref_init(&obj_request->kref);
1473
1474        dout("%s %p\n", __func__, obj_request);
1475        return obj_request;
1476}
1477
1478static void rbd_obj_request_destroy(struct kref *kref)
1479{
1480        struct rbd_obj_request *obj_request;
1481        struct ceph_osd_request *osd_req;
1482        u32 i;
1483
1484        obj_request = container_of(kref, struct rbd_obj_request, kref);
1485
1486        dout("%s: obj %p\n", __func__, obj_request);
1487
1488        while (!list_empty(&obj_request->osd_reqs)) {
1489                osd_req = list_first_entry(&obj_request->osd_reqs,
1490                                    struct ceph_osd_request, r_private_item);
1491                list_del_init(&osd_req->r_private_item);
1492                ceph_osdc_put_request(osd_req);
1493        }
1494
1495        switch (obj_request->img_request->data_type) {
1496        case OBJ_REQUEST_NODATA:
1497        case OBJ_REQUEST_BIO:
1498        case OBJ_REQUEST_BVECS:
1499                break;          /* Nothing to do */
1500        case OBJ_REQUEST_OWN_BVECS:
1501                kfree(obj_request->bvec_pos.bvecs);
1502                break;
1503        default:
1504                BUG();
1505        }
1506
1507        kfree(obj_request->img_extents);
1508        if (obj_request->copyup_bvecs) {
1509                for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1510                        if (obj_request->copyup_bvecs[i].bv_page)
1511                                __free_page(obj_request->copyup_bvecs[i].bv_page);
1512                }
1513                kfree(obj_request->copyup_bvecs);
1514        }
1515
1516        kmem_cache_free(rbd_obj_request_cache, obj_request);
1517}
1518
1519/* It's OK to call this for a device with no parent */
1520
1521static void rbd_spec_put(struct rbd_spec *spec);
1522static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1523{
1524        rbd_dev_remove_parent(rbd_dev);
1525        rbd_spec_put(rbd_dev->parent_spec);
1526        rbd_dev->parent_spec = NULL;
1527        rbd_dev->parent_overlap = 0;
1528}
1529
1530/*
1531 * Parent image reference counting is used to determine when an
1532 * image's parent fields can be safely torn down--after there are no
1533 * more in-flight requests to the parent image.  When the last
1534 * reference is dropped, cleaning them up is safe.
1535 */
1536static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1537{
1538        int counter;
1539
1540        if (!rbd_dev->parent_spec)
1541                return;
1542
1543        counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1544        if (counter > 0)
1545                return;
1546
1547        /* Last reference; clean up parent data structures */
1548
1549        if (!counter)
1550                rbd_dev_unparent(rbd_dev);
1551        else
1552                rbd_warn(rbd_dev, "parent reference underflow");
1553}
1554
1555/*
1556 * If an image has a non-zero parent overlap, get a reference to its
1557 * parent.
1558 *
1559 * Returns true if the rbd device has a parent with a non-zero
1560 * overlap and a reference for it was successfully taken, or
1561 * false otherwise.
1562 */
1563static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1564{
1565        int counter = 0;
1566
1567        if (!rbd_dev->parent_spec)
1568                return false;
1569
1570        if (rbd_dev->parent_overlap)
1571                counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1572
1573        if (counter < 0)
1574                rbd_warn(rbd_dev, "parent reference overflow");
1575
1576        return counter > 0;
1577}
1578
1579static void rbd_img_request_init(struct rbd_img_request *img_request,
1580                                 struct rbd_device *rbd_dev,
1581                                 enum obj_operation_type op_type)
1582{
1583        memset(img_request, 0, sizeof(*img_request));
1584
1585        img_request->rbd_dev = rbd_dev;
1586        img_request->op_type = op_type;
1587
1588        INIT_LIST_HEAD(&img_request->lock_item);
1589        INIT_LIST_HEAD(&img_request->object_extents);
1590        mutex_init(&img_request->state_mutex);
1591}
1592
1593static void rbd_img_capture_header(struct rbd_img_request *img_req)
1594{
1595        struct rbd_device *rbd_dev = img_req->rbd_dev;
1596
1597        lockdep_assert_held(&rbd_dev->header_rwsem);
1598
1599        if (rbd_img_is_write(img_req))
1600                img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1601        else
1602                img_req->snap_id = rbd_dev->spec->snap_id;
1603
1604        if (rbd_dev_parent_get(rbd_dev))
1605                img_request_layered_set(img_req);
1606}
1607
1608static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1609{
1610        struct rbd_obj_request *obj_request;
1611        struct rbd_obj_request *next_obj_request;
1612
1613        dout("%s: img %p\n", __func__, img_request);
1614
1615        WARN_ON(!list_empty(&img_request->lock_item));
1616        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1617                rbd_img_obj_request_del(img_request, obj_request);
1618
1619        if (img_request_layered_test(img_request))
1620                rbd_dev_parent_put(img_request->rbd_dev);
1621
1622        if (rbd_img_is_write(img_request))
1623                ceph_put_snap_context(img_request->snapc);
1624
1625        if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1626                kmem_cache_free(rbd_img_request_cache, img_request);
1627}
1628
1629#define BITS_PER_OBJ    2
1630#define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1631#define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1632
1633static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1634                                   u64 *index, u8 *shift)
1635{
1636        u32 off;
1637
1638        rbd_assert(objno < rbd_dev->object_map_size);
1639        *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1640        *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1641}
1642
1643static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1644{
1645        u64 index;
1646        u8 shift;
1647
1648        lockdep_assert_held(&rbd_dev->object_map_lock);
1649        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1650        return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1651}
1652
1653static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1654{
1655        u64 index;
1656        u8 shift;
1657        u8 *p;
1658
1659        lockdep_assert_held(&rbd_dev->object_map_lock);
1660        rbd_assert(!(val & ~OBJ_MASK));
1661
1662        __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1663        p = &rbd_dev->object_map[index];
1664        *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1665}
1666
1667static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1668{
1669        u8 state;
1670
1671        spin_lock(&rbd_dev->object_map_lock);
1672        state = __rbd_object_map_get(rbd_dev, objno);
1673        spin_unlock(&rbd_dev->object_map_lock);
1674        return state;
1675}
1676
1677static bool use_object_map(struct rbd_device *rbd_dev)
1678{
1679        /*
1680         * An image mapped read-only can't use the object map -- it isn't
1681         * loaded because the header lock isn't acquired.  Someone else can
1682         * write to the image and update the object map behind our back.
1683         *
1684         * A snapshot can't be written to, so using the object map is always
1685         * safe.
1686         */
1687        if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1688                return false;
1689
1690        return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1691                !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1692}
1693
1694static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1695{
1696        u8 state;
1697
1698        /* fall back to default logic if object map is disabled or invalid */
1699        if (!use_object_map(rbd_dev))
1700                return true;
1701
1702        state = rbd_object_map_get(rbd_dev, objno);
1703        return state != OBJECT_NONEXISTENT;
1704}
1705
1706static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1707                                struct ceph_object_id *oid)
1708{
1709        if (snap_id == CEPH_NOSNAP)
1710                ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1711                                rbd_dev->spec->image_id);
1712        else
1713                ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1714                                rbd_dev->spec->image_id, snap_id);
1715}
1716
1717static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1718{
1719        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1720        CEPH_DEFINE_OID_ONSTACK(oid);
1721        u8 lock_type;
1722        char *lock_tag;
1723        struct ceph_locker *lockers;
1724        u32 num_lockers;
1725        bool broke_lock = false;
1726        int ret;
1727
1728        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1729
1730again:
1731        ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1732                            CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1733        if (ret != -EBUSY || broke_lock) {
1734                if (ret == -EEXIST)
1735                        ret = 0; /* already locked by myself */
1736                if (ret)
1737                        rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1738                return ret;
1739        }
1740
1741        ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1742                                 RBD_LOCK_NAME, &lock_type, &lock_tag,
1743                                 &lockers, &num_lockers);
1744        if (ret) {
1745                if (ret == -ENOENT)
1746                        goto again;
1747
1748                rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1749                return ret;
1750        }
1751
1752        kfree(lock_tag);
1753        if (num_lockers == 0)
1754                goto again;
1755
1756        rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1757                 ENTITY_NAME(lockers[0].id.name));
1758
1759        ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1760                                  RBD_LOCK_NAME, lockers[0].id.cookie,
1761                                  &lockers[0].id.name);
1762        ceph_free_lockers(lockers, num_lockers);
1763        if (ret) {
1764                if (ret == -ENOENT)
1765                        goto again;
1766
1767                rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1768                return ret;
1769        }
1770
1771        broke_lock = true;
1772        goto again;
1773}
1774
1775static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1776{
1777        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1778        CEPH_DEFINE_OID_ONSTACK(oid);
1779        int ret;
1780
1781        rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1782
1783        ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1784                              "");
1785        if (ret && ret != -ENOENT)
1786                rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1787}
1788
1789static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1790{
1791        u8 struct_v;
1792        u32 struct_len;
1793        u32 header_len;
1794        void *header_end;
1795        int ret;
1796
1797        ceph_decode_32_safe(p, end, header_len, e_inval);
1798        header_end = *p + header_len;
1799
1800        ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1801                                  &struct_len);
1802        if (ret)
1803                return ret;
1804
1805        ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1806
1807        *p = header_end;
1808        return 0;
1809
1810e_inval:
1811        return -EINVAL;
1812}
1813
1814static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1815{
1816        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1817        CEPH_DEFINE_OID_ONSTACK(oid);
1818        struct page **pages;
1819        void *p, *end;
1820        size_t reply_len;
1821        u64 num_objects;
1822        u64 object_map_bytes;
1823        u64 object_map_size;
1824        int num_pages;
1825        int ret;
1826
1827        rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1828
1829        num_objects = ceph_get_num_objects(&rbd_dev->layout,
1830                                           rbd_dev->mapping.size);
1831        object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1832                                            BITS_PER_BYTE);
1833        num_pages = calc_pages_for(0, object_map_bytes) + 1;
1834        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1835        if (IS_ERR(pages))
1836                return PTR_ERR(pages);
1837
1838        reply_len = num_pages * PAGE_SIZE;
1839        rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1840        ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1841                             "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1842                             NULL, 0, pages, &reply_len);
1843        if (ret)
1844                goto out;
1845
1846        p = page_address(pages[0]);
1847        end = p + min(reply_len, (size_t)PAGE_SIZE);
1848        ret = decode_object_map_header(&p, end, &object_map_size);
1849        if (ret)
1850                goto out;
1851
1852        if (object_map_size != num_objects) {
1853                rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1854                         object_map_size, num_objects);
1855                ret = -EINVAL;
1856                goto out;
1857        }
1858
1859        if (offset_in_page(p) + object_map_bytes > reply_len) {
1860                ret = -EINVAL;
1861                goto out;
1862        }
1863
1864        rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1865        if (!rbd_dev->object_map) {
1866                ret = -ENOMEM;
1867                goto out;
1868        }
1869
1870        rbd_dev->object_map_size = object_map_size;
1871        ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1872                                   offset_in_page(p), object_map_bytes);
1873
1874out:
1875        ceph_release_page_vector(pages, num_pages);
1876        return ret;
1877}
1878
1879static void rbd_object_map_free(struct rbd_device *rbd_dev)
1880{
1881        kvfree(rbd_dev->object_map);
1882        rbd_dev->object_map = NULL;
1883        rbd_dev->object_map_size = 0;
1884}
1885
1886static int rbd_object_map_load(struct rbd_device *rbd_dev)
1887{
1888        int ret;
1889
1890        ret = __rbd_object_map_load(rbd_dev);
1891        if (ret)
1892                return ret;
1893
1894        ret = rbd_dev_v2_get_flags(rbd_dev);
1895        if (ret) {
1896                rbd_object_map_free(rbd_dev);
1897                return ret;
1898        }
1899
1900        if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1901                rbd_warn(rbd_dev, "object map is invalid");
1902
1903        return 0;
1904}
1905
1906static int rbd_object_map_open(struct rbd_device *rbd_dev)
1907{
1908        int ret;
1909
1910        ret = rbd_object_map_lock(rbd_dev);
1911        if (ret)
1912                return ret;
1913
1914        ret = rbd_object_map_load(rbd_dev);
1915        if (ret) {
1916                rbd_object_map_unlock(rbd_dev);
1917                return ret;
1918        }
1919
1920        return 0;
1921}
1922
1923static void rbd_object_map_close(struct rbd_device *rbd_dev)
1924{
1925        rbd_object_map_free(rbd_dev);
1926        rbd_object_map_unlock(rbd_dev);
1927}
1928
1929/*
1930 * This function needs snap_id (or more precisely just something to
1931 * distinguish between HEAD and snapshot object maps), new_state and
1932 * current_state that were passed to rbd_object_map_update().
1933 *
1934 * To avoid allocating and stashing a context we piggyback on the OSD
1935 * request.  A HEAD update has two ops (assert_locked).  For new_state
1936 * and current_state we decode our own object_map_update op, encoded in
1937 * rbd_cls_object_map_update().
1938 */
1939static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1940                                        struct ceph_osd_request *osd_req)
1941{
1942        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1943        struct ceph_osd_data *osd_data;
1944        u64 objno;
1945        u8 state, new_state, current_state;
1946        bool has_current_state;
1947        void *p;
1948
1949        if (osd_req->r_result)
1950                return osd_req->r_result;
1951
1952        /*
1953         * Nothing to do for a snapshot object map.
1954         */
1955        if (osd_req->r_num_ops == 1)
1956                return 0;
1957
1958        /*
1959         * Update in-memory HEAD object map.
1960         */
1961        rbd_assert(osd_req->r_num_ops == 2);
1962        osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
1963        rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
1964
1965        p = page_address(osd_data->pages[0]);
1966        objno = ceph_decode_64(&p);
1967        rbd_assert(objno == obj_req->ex.oe_objno);
1968        rbd_assert(ceph_decode_64(&p) == objno + 1);
1969        new_state = ceph_decode_8(&p);
1970        has_current_state = ceph_decode_8(&p);
1971        if (has_current_state)
1972                current_state = ceph_decode_8(&p);
1973
1974        spin_lock(&rbd_dev->object_map_lock);
1975        state = __rbd_object_map_get(rbd_dev, objno);
1976        if (!has_current_state || current_state == state ||
1977            (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
1978                __rbd_object_map_set(rbd_dev, objno, new_state);
1979        spin_unlock(&rbd_dev->object_map_lock);
1980
1981        return 0;
1982}
1983
1984static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
1985{
1986        struct rbd_obj_request *obj_req = osd_req->r_priv;
1987        int result;
1988
1989        dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1990             osd_req->r_result, obj_req);
1991
1992        result = rbd_object_map_update_finish(obj_req, osd_req);
1993        rbd_obj_handle_request(obj_req, result);
1994}
1995
1996static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
1997{
1998        u8 state = rbd_object_map_get(rbd_dev, objno);
1999
2000        if (state == new_state ||
2001            (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2002            (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2003                return false;
2004
2005        return true;
2006}
2007
2008static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2009                                     int which, u64 objno, u8 new_state,
2010                                     const u8 *current_state)
2011{
2012        struct page **pages;
2013        void *p, *start;
2014        int ret;
2015
2016        ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2017        if (ret)
2018                return ret;
2019
2020        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2021        if (IS_ERR(pages))
2022                return PTR_ERR(pages);
2023
2024        p = start = page_address(pages[0]);
2025        ceph_encode_64(&p, objno);
2026        ceph_encode_64(&p, objno + 1);
2027        ceph_encode_8(&p, new_state);
2028        if (current_state) {
2029                ceph_encode_8(&p, 1);
2030                ceph_encode_8(&p, *current_state);
2031        } else {
2032                ceph_encode_8(&p, 0);
2033        }
2034
2035        osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2036                                          false, true);
2037        return 0;
2038}
2039
2040/*
2041 * Return:
2042 *   0 - object map update sent
2043 *   1 - object map update isn't needed
2044 *  <0 - error
2045 */
2046static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2047                                 u8 new_state, const u8 *current_state)
2048{
2049        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2050        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2051        struct ceph_osd_request *req;
2052        int num_ops = 1;
2053        int which = 0;
2054        int ret;
2055
2056        if (snap_id == CEPH_NOSNAP) {
2057                if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2058                        return 1;
2059
2060                num_ops++; /* assert_locked */
2061        }
2062
2063        req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2064        if (!req)
2065                return -ENOMEM;
2066
2067        list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2068        req->r_callback = rbd_object_map_callback;
2069        req->r_priv = obj_req;
2070
2071        rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2072        ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2073        req->r_flags = CEPH_OSD_FLAG_WRITE;
2074        ktime_get_real_ts64(&req->r_mtime);
2075
2076        if (snap_id == CEPH_NOSNAP) {
2077                /*
2078                 * Protect against possible race conditions during lock
2079                 * ownership transitions.
2080                 */
2081                ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2082                                             CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2083                if (ret)
2084                        return ret;
2085        }
2086
2087        ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2088                                        new_state, current_state);
2089        if (ret)
2090                return ret;
2091
2092        ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2093        if (ret)
2094                return ret;
2095
2096        ceph_osdc_start_request(osdc, req, false);
2097        return 0;
2098}
2099
2100static void prune_extents(struct ceph_file_extent *img_extents,
2101                          u32 *num_img_extents, u64 overlap)
2102{
2103        u32 cnt = *num_img_extents;
2104
2105        /* drop extents completely beyond the overlap */
2106        while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2107                cnt--;
2108
2109        if (cnt) {
2110                struct ceph_file_extent *ex = &img_extents[cnt - 1];
2111
2112                /* trim final overlapping extent */
2113                if (ex->fe_off + ex->fe_len > overlap)
2114                        ex->fe_len = overlap - ex->fe_off;
2115        }
2116
2117        *num_img_extents = cnt;
2118}
2119
2120/*
2121 * Determine the byte range(s) covered by either just the object extent
2122 * or the entire object in the parent image.
2123 */
2124static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2125                                    bool entire)
2126{
2127        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2128        int ret;
2129
2130        if (!rbd_dev->parent_overlap)
2131                return 0;
2132
2133        ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2134                                  entire ? 0 : obj_req->ex.oe_off,
2135                                  entire ? rbd_dev->layout.object_size :
2136                                                        obj_req->ex.oe_len,
2137                                  &obj_req->img_extents,
2138                                  &obj_req->num_img_extents);
2139        if (ret)
2140                return ret;
2141
2142        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2143                      rbd_dev->parent_overlap);
2144        return 0;
2145}
2146
2147static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2148{
2149        struct rbd_obj_request *obj_req = osd_req->r_priv;
2150
2151        switch (obj_req->img_request->data_type) {
2152        case OBJ_REQUEST_BIO:
2153                osd_req_op_extent_osd_data_bio(osd_req, which,
2154                                               &obj_req->bio_pos,
2155                                               obj_req->ex.oe_len);
2156                break;
2157        case OBJ_REQUEST_BVECS:
2158        case OBJ_REQUEST_OWN_BVECS:
2159                rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2160                                                        obj_req->ex.oe_len);
2161                rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2162                osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2163                                                    &obj_req->bvec_pos);
2164                break;
2165        default:
2166                BUG();
2167        }
2168}
2169
2170static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2171{
2172        struct page **pages;
2173
2174        /*
2175         * The response data for a STAT call consists of:
2176         *     le64 length;
2177         *     struct {
2178         *         le32 tv_sec;
2179         *         le32 tv_nsec;
2180         *     } mtime;
2181         */
2182        pages = ceph_alloc_page_vector(1, GFP_NOIO);
2183        if (IS_ERR(pages))
2184                return PTR_ERR(pages);
2185
2186        osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2187        osd_req_op_raw_data_in_pages(osd_req, which, pages,
2188                                     8 + sizeof(struct ceph_timespec),
2189                                     0, false, true);
2190        return 0;
2191}
2192
2193static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2194                                u32 bytes)
2195{
2196        struct rbd_obj_request *obj_req = osd_req->r_priv;
2197        int ret;
2198
2199        ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2200        if (ret)
2201                return ret;
2202
2203        osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2204                                          obj_req->copyup_bvec_count, bytes);
2205        return 0;
2206}
2207
2208static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2209{
2210        obj_req->read_state = RBD_OBJ_READ_START;
2211        return 0;
2212}
2213
2214static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2215                                      int which)
2216{
2217        struct rbd_obj_request *obj_req = osd_req->r_priv;
2218        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2219        u16 opcode;
2220
2221        if (!use_object_map(rbd_dev) ||
2222            !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2223                osd_req_op_alloc_hint_init(osd_req, which++,
2224                                           rbd_dev->layout.object_size,
2225                                           rbd_dev->layout.object_size,
2226                                           rbd_dev->opts->alloc_hint_flags);
2227        }
2228
2229        if (rbd_obj_is_entire(obj_req))
2230                opcode = CEPH_OSD_OP_WRITEFULL;
2231        else
2232                opcode = CEPH_OSD_OP_WRITE;
2233
2234        osd_req_op_extent_init(osd_req, which, opcode,
2235                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2236        rbd_osd_setup_data(osd_req, which);
2237}
2238
2239static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2240{
2241        int ret;
2242
2243        /* reverse map the entire object onto the parent */
2244        ret = rbd_obj_calc_img_extents(obj_req, true);
2245        if (ret)
2246                return ret;
2247
2248        if (rbd_obj_copyup_enabled(obj_req))
2249                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2250
2251        obj_req->write_state = RBD_OBJ_WRITE_START;
2252        return 0;
2253}
2254
2255static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2256{
2257        return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2258                                          CEPH_OSD_OP_ZERO;
2259}
2260
2261static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2262                                        int which)
2263{
2264        struct rbd_obj_request *obj_req = osd_req->r_priv;
2265
2266        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2267                rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2268                osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2269        } else {
2270                osd_req_op_extent_init(osd_req, which,
2271                                       truncate_or_zero_opcode(obj_req),
2272                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2273                                       0, 0);
2274        }
2275}
2276
2277static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2278{
2279        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2280        u64 off, next_off;
2281        int ret;
2282
2283        /*
2284         * Align the range to alloc_size boundary and punt on discards
2285         * that are too small to free up any space.
2286         *
2287         * alloc_size == object_size && is_tail() is a special case for
2288         * filestore with filestore_punch_hole = false, needed to allow
2289         * truncate (in addition to delete).
2290         */
2291        if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2292            !rbd_obj_is_tail(obj_req)) {
2293                off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2294                next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2295                                      rbd_dev->opts->alloc_size);
2296                if (off >= next_off)
2297                        return 1;
2298
2299                dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2300                     obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2301                     off, next_off - off);
2302                obj_req->ex.oe_off = off;
2303                obj_req->ex.oe_len = next_off - off;
2304        }
2305
2306        /* reverse map the entire object onto the parent */
2307        ret = rbd_obj_calc_img_extents(obj_req, true);
2308        if (ret)
2309                return ret;
2310
2311        obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2312        if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2313                obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2314
2315        obj_req->write_state = RBD_OBJ_WRITE_START;
2316        return 0;
2317}
2318
2319static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2320                                        int which)
2321{
2322        struct rbd_obj_request *obj_req = osd_req->r_priv;
2323        u16 opcode;
2324
2325        if (rbd_obj_is_entire(obj_req)) {
2326                if (obj_req->num_img_extents) {
2327                        if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2328                                osd_req_op_init(osd_req, which++,
2329                                                CEPH_OSD_OP_CREATE, 0);
2330                        opcode = CEPH_OSD_OP_TRUNCATE;
2331                } else {
2332                        rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2333                        osd_req_op_init(osd_req, which++,
2334                                        CEPH_OSD_OP_DELETE, 0);
2335                        opcode = 0;
2336                }
2337        } else {
2338                opcode = truncate_or_zero_opcode(obj_req);
2339        }
2340
2341        if (opcode)
2342                osd_req_op_extent_init(osd_req, which, opcode,
2343                                       obj_req->ex.oe_off, obj_req->ex.oe_len,
2344                                       0, 0);
2345}
2346
2347static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2348{
2349        int ret;
2350
2351        /* reverse map the entire object onto the parent */
2352        ret = rbd_obj_calc_img_extents(obj_req, true);
2353        if (ret)
2354                return ret;
2355
2356        if (rbd_obj_copyup_enabled(obj_req))
2357                obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2358        if (!obj_req->num_img_extents) {
2359                obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2360                if (rbd_obj_is_entire(obj_req))
2361                        obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2362        }
2363
2364        obj_req->write_state = RBD_OBJ_WRITE_START;
2365        return 0;
2366}
2367
2368static int count_write_ops(struct rbd_obj_request *obj_req)
2369{
2370        struct rbd_img_request *img_req = obj_req->img_request;
2371
2372        switch (img_req->op_type) {
2373        case OBJ_OP_WRITE:
2374                if (!use_object_map(img_req->rbd_dev) ||
2375                    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2376                        return 2; /* setallochint + write/writefull */
2377
2378                return 1; /* write/writefull */
2379        case OBJ_OP_DISCARD:
2380                return 1; /* delete/truncate/zero */
2381        case OBJ_OP_ZEROOUT:
2382                if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2383                    !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2384                        return 2; /* create + truncate */
2385
2386                return 1; /* delete/truncate/zero */
2387        default:
2388                BUG();
2389        }
2390}
2391
2392static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2393                                    int which)
2394{
2395        struct rbd_obj_request *obj_req = osd_req->r_priv;
2396
2397        switch (obj_req->img_request->op_type) {
2398        case OBJ_OP_WRITE:
2399                __rbd_osd_setup_write_ops(osd_req, which);
2400                break;
2401        case OBJ_OP_DISCARD:
2402                __rbd_osd_setup_discard_ops(osd_req, which);
2403                break;
2404        case OBJ_OP_ZEROOUT:
2405                __rbd_osd_setup_zeroout_ops(osd_req, which);
2406                break;
2407        default:
2408                BUG();
2409        }
2410}
2411
2412/*
2413 * Prune the list of object requests (adjust offset and/or length, drop
2414 * redundant requests).  Prepare object request state machines and image
2415 * request state machine for execution.
2416 */
2417static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2418{
2419        struct rbd_obj_request *obj_req, *next_obj_req;
2420        int ret;
2421
2422        for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2423                switch (img_req->op_type) {
2424                case OBJ_OP_READ:
2425                        ret = rbd_obj_init_read(obj_req);
2426                        break;
2427                case OBJ_OP_WRITE:
2428                        ret = rbd_obj_init_write(obj_req);
2429                        break;
2430                case OBJ_OP_DISCARD:
2431                        ret = rbd_obj_init_discard(obj_req);
2432                        break;
2433                case OBJ_OP_ZEROOUT:
2434                        ret = rbd_obj_init_zeroout(obj_req);
2435                        break;
2436                default:
2437                        BUG();
2438                }
2439                if (ret < 0)
2440                        return ret;
2441                if (ret > 0) {
2442                        rbd_img_obj_request_del(img_req, obj_req);
2443                        continue;
2444                }
2445        }
2446
2447        img_req->state = RBD_IMG_START;
2448        return 0;
2449}
2450
2451union rbd_img_fill_iter {
2452        struct ceph_bio_iter    bio_iter;
2453        struct ceph_bvec_iter   bvec_iter;
2454};
2455
2456struct rbd_img_fill_ctx {
2457        enum obj_request_type   pos_type;
2458        union rbd_img_fill_iter *pos;
2459        union rbd_img_fill_iter iter;
2460        ceph_object_extent_fn_t set_pos_fn;
2461        ceph_object_extent_fn_t count_fn;
2462        ceph_object_extent_fn_t copy_fn;
2463};
2464
2465static struct ceph_object_extent *alloc_object_extent(void *arg)
2466{
2467        struct rbd_img_request *img_req = arg;
2468        struct rbd_obj_request *obj_req;
2469
2470        obj_req = rbd_obj_request_create();
2471        if (!obj_req)
2472                return NULL;
2473
2474        rbd_img_obj_request_add(img_req, obj_req);
2475        return &obj_req->ex;
2476}
2477
2478/*
2479 * While su != os && sc == 1 is technically not fancy (it's the same
2480 * layout as su == os && sc == 1), we can't use the nocopy path for it
2481 * because ->set_pos_fn() should be called only once per object.
2482 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2483 * treat su != os && sc == 1 as fancy.
2484 */
2485static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2486{
2487        return l->stripe_unit != l->object_size;
2488}
2489
2490static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2491                                       struct ceph_file_extent *img_extents,
2492                                       u32 num_img_extents,
2493                                       struct rbd_img_fill_ctx *fctx)
2494{
2495        u32 i;
2496        int ret;
2497
2498        img_req->data_type = fctx->pos_type;
2499
2500        /*
2501         * Create object requests and set each object request's starting
2502         * position in the provided bio (list) or bio_vec array.
2503         */
2504        fctx->iter = *fctx->pos;
2505        for (i = 0; i < num_img_extents; i++) {
2506                ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2507                                           img_extents[i].fe_off,
2508                                           img_extents[i].fe_len,
2509                                           &img_req->object_extents,
2510                                           alloc_object_extent, img_req,
2511                                           fctx->set_pos_fn, &fctx->iter);
2512                if (ret)
2513                        return ret;
2514        }
2515
2516        return __rbd_img_fill_request(img_req);
2517}
2518
2519/*
2520 * Map a list of image extents to a list of object extents, create the
2521 * corresponding object requests (normally each to a different object,
2522 * but not always) and add them to @img_req.  For each object request,
2523 * set up its data descriptor to point to the corresponding chunk(s) of
2524 * @fctx->pos data buffer.
2525 *
2526 * Because ceph_file_to_extents() will merge adjacent object extents
2527 * together, each object request's data descriptor may point to multiple
2528 * different chunks of @fctx->pos data buffer.
2529 *
2530 * @fctx->pos data buffer is assumed to be large enough.
2531 */
2532static int rbd_img_fill_request(struct rbd_img_request *img_req,
2533                                struct ceph_file_extent *img_extents,
2534                                u32 num_img_extents,
2535                                struct rbd_img_fill_ctx *fctx)
2536{
2537        struct rbd_device *rbd_dev = img_req->rbd_dev;
2538        struct rbd_obj_request *obj_req;
2539        u32 i;
2540        int ret;
2541
2542        if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2543            !rbd_layout_is_fancy(&rbd_dev->layout))
2544                return rbd_img_fill_request_nocopy(img_req, img_extents,
2545                                                   num_img_extents, fctx);
2546
2547        img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2548
2549        /*
2550         * Create object requests and determine ->bvec_count for each object
2551         * request.  Note that ->bvec_count sum over all object requests may
2552         * be greater than the number of bio_vecs in the provided bio (list)
2553         * or bio_vec array because when mapped, those bio_vecs can straddle
2554         * stripe unit boundaries.
2555         */
2556        fctx->iter = *fctx->pos;
2557        for (i = 0; i < num_img_extents; i++) {
2558                ret = ceph_file_to_extents(&rbd_dev->layout,
2559                                           img_extents[i].fe_off,
2560                                           img_extents[i].fe_len,
2561                                           &img_req->object_extents,
2562                                           alloc_object_extent, img_req,
2563                                           fctx->count_fn, &fctx->iter);
2564                if (ret)
2565                        return ret;
2566        }
2567
2568        for_each_obj_request(img_req, obj_req) {
2569                obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2570                                              sizeof(*obj_req->bvec_pos.bvecs),
2571                                              GFP_NOIO);
2572                if (!obj_req->bvec_pos.bvecs)
2573                        return -ENOMEM;
2574        }
2575
2576        /*
2577         * Fill in each object request's private bio_vec array, splitting and
2578         * rearranging the provided bio_vecs in stripe unit chunks as needed.
2579         */
2580        fctx->iter = *fctx->pos;
2581        for (i = 0; i < num_img_extents; i++) {
2582                ret = ceph_iterate_extents(&rbd_dev->layout,
2583                                           img_extents[i].fe_off,
2584                                           img_extents[i].fe_len,
2585                                           &img_req->object_extents,
2586                                           fctx->copy_fn, &fctx->iter);
2587                if (ret)
2588                        return ret;
2589        }
2590
2591        return __rbd_img_fill_request(img_req);
2592}
2593
2594static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2595                               u64 off, u64 len)
2596{
2597        struct ceph_file_extent ex = { off, len };
2598        union rbd_img_fill_iter dummy = {};
2599        struct rbd_img_fill_ctx fctx = {
2600                .pos_type = OBJ_REQUEST_NODATA,
2601                .pos = &dummy,
2602        };
2603
2604        return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2605}
2606
2607static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2608{
2609        struct rbd_obj_request *obj_req =
2610            container_of(ex, struct rbd_obj_request, ex);
2611        struct ceph_bio_iter *it = arg;
2612
2613        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2614        obj_req->bio_pos = *it;
2615        ceph_bio_iter_advance(it, bytes);
2616}
2617
2618static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2619{
2620        struct rbd_obj_request *obj_req =
2621            container_of(ex, struct rbd_obj_request, ex);
2622        struct ceph_bio_iter *it = arg;
2623
2624        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2625        ceph_bio_iter_advance_step(it, bytes, ({
2626                obj_req->bvec_count++;
2627        }));
2628
2629}
2630
2631static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2632{
2633        struct rbd_obj_request *obj_req =
2634            container_of(ex, struct rbd_obj_request, ex);
2635        struct ceph_bio_iter *it = arg;
2636
2637        dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2638        ceph_bio_iter_advance_step(it, bytes, ({
2639                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2640                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2641        }));
2642}
2643
2644static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2645                                   struct ceph_file_extent *img_extents,
2646                                   u32 num_img_extents,
2647                                   struct ceph_bio_iter *bio_pos)
2648{
2649        struct rbd_img_fill_ctx fctx = {
2650                .pos_type = OBJ_REQUEST_BIO,
2651                .pos = (union rbd_img_fill_iter *)bio_pos,
2652                .set_pos_fn = set_bio_pos,
2653                .count_fn = count_bio_bvecs,
2654                .copy_fn = copy_bio_bvecs,
2655        };
2656
2657        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2658                                    &fctx);
2659}
2660
2661static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2662                                 u64 off, u64 len, struct bio *bio)
2663{
2664        struct ceph_file_extent ex = { off, len };
2665        struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2666
2667        return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2668}
2669
2670static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2671{
2672        struct rbd_obj_request *obj_req =
2673            container_of(ex, struct rbd_obj_request, ex);
2674        struct ceph_bvec_iter *it = arg;
2675
2676        obj_req->bvec_pos = *it;
2677        ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2678        ceph_bvec_iter_advance(it, bytes);
2679}
2680
2681static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2682{
2683        struct rbd_obj_request *obj_req =
2684            container_of(ex, struct rbd_obj_request, ex);
2685        struct ceph_bvec_iter *it = arg;
2686
2687        ceph_bvec_iter_advance_step(it, bytes, ({
2688                obj_req->bvec_count++;
2689        }));
2690}
2691
2692static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2693{
2694        struct rbd_obj_request *obj_req =
2695            container_of(ex, struct rbd_obj_request, ex);
2696        struct ceph_bvec_iter *it = arg;
2697
2698        ceph_bvec_iter_advance_step(it, bytes, ({
2699                obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2700                obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2701        }));
2702}
2703
2704static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2705                                     struct ceph_file_extent *img_extents,
2706                                     u32 num_img_extents,
2707                                     struct ceph_bvec_iter *bvec_pos)
2708{
2709        struct rbd_img_fill_ctx fctx = {
2710                .pos_type = OBJ_REQUEST_BVECS,
2711                .pos = (union rbd_img_fill_iter *)bvec_pos,
2712                .set_pos_fn = set_bvec_pos,
2713                .count_fn = count_bvecs,
2714                .copy_fn = copy_bvecs,
2715        };
2716
2717        return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2718                                    &fctx);
2719}
2720
2721static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2722                                   struct ceph_file_extent *img_extents,
2723                                   u32 num_img_extents,
2724                                   struct bio_vec *bvecs)
2725{
2726        struct ceph_bvec_iter it = {
2727                .bvecs = bvecs,
2728                .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2729                                                             num_img_extents) },
2730        };
2731
2732        return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2733                                         &it);
2734}
2735
2736static void rbd_img_handle_request_work(struct work_struct *work)
2737{
2738        struct rbd_img_request *img_req =
2739            container_of(work, struct rbd_img_request, work);
2740
2741        rbd_img_handle_request(img_req, img_req->work_result);
2742}
2743
2744static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2745{
2746        INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2747        img_req->work_result = result;
2748        queue_work(rbd_wq, &img_req->work);
2749}
2750
2751static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2752{
2753        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2754
2755        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2756                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2757                return true;
2758        }
2759
2760        dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2761             obj_req->ex.oe_objno);
2762        return false;
2763}
2764
2765static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2766{
2767        struct ceph_osd_request *osd_req;
2768        int ret;
2769
2770        osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2771        if (IS_ERR(osd_req))
2772                return PTR_ERR(osd_req);
2773
2774        osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2775                               obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2776        rbd_osd_setup_data(osd_req, 0);
2777        rbd_osd_format_read(osd_req);
2778
2779        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2780        if (ret)
2781                return ret;
2782
2783        rbd_osd_submit(osd_req);
2784        return 0;
2785}
2786
2787static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2788{
2789        struct rbd_img_request *img_req = obj_req->img_request;
2790        struct rbd_device *parent = img_req->rbd_dev->parent;
2791        struct rbd_img_request *child_img_req;
2792        int ret;
2793
2794        child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2795        if (!child_img_req)
2796                return -ENOMEM;
2797
2798        rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2799        __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2800        child_img_req->obj_request = obj_req;
2801
2802        down_read(&parent->header_rwsem);
2803        rbd_img_capture_header(child_img_req);
2804        up_read(&parent->header_rwsem);
2805
2806        dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2807             obj_req);
2808
2809        if (!rbd_img_is_write(img_req)) {
2810                switch (img_req->data_type) {
2811                case OBJ_REQUEST_BIO:
2812                        ret = __rbd_img_fill_from_bio(child_img_req,
2813                                                      obj_req->img_extents,
2814                                                      obj_req->num_img_extents,
2815                                                      &obj_req->bio_pos);
2816                        break;
2817                case OBJ_REQUEST_BVECS:
2818                case OBJ_REQUEST_OWN_BVECS:
2819                        ret = __rbd_img_fill_from_bvecs(child_img_req,
2820                                                      obj_req->img_extents,
2821                                                      obj_req->num_img_extents,
2822                                                      &obj_req->bvec_pos);
2823                        break;
2824                default:
2825                        BUG();
2826                }
2827        } else {
2828                ret = rbd_img_fill_from_bvecs(child_img_req,
2829                                              obj_req->img_extents,
2830                                              obj_req->num_img_extents,
2831                                              obj_req->copyup_bvecs);
2832        }
2833        if (ret) {
2834                rbd_img_request_destroy(child_img_req);
2835                return ret;
2836        }
2837
2838        /* avoid parent chain recursion */
2839        rbd_img_schedule(child_img_req, 0);
2840        return 0;
2841}
2842
2843static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2844{
2845        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2846        int ret;
2847
2848again:
2849        switch (obj_req->read_state) {
2850        case RBD_OBJ_READ_START:
2851                rbd_assert(!*result);
2852
2853                if (!rbd_obj_may_exist(obj_req)) {
2854                        *result = -ENOENT;
2855                        obj_req->read_state = RBD_OBJ_READ_OBJECT;
2856                        goto again;
2857                }
2858
2859                ret = rbd_obj_read_object(obj_req);
2860                if (ret) {
2861                        *result = ret;
2862                        return true;
2863                }
2864                obj_req->read_state = RBD_OBJ_READ_OBJECT;
2865                return false;
2866        case RBD_OBJ_READ_OBJECT:
2867                if (*result == -ENOENT && rbd_dev->parent_overlap) {
2868                        /* reverse map this object extent onto the parent */
2869                        ret = rbd_obj_calc_img_extents(obj_req, false);
2870                        if (ret) {
2871                                *result = ret;
2872                                return true;
2873                        }
2874                        if (obj_req->num_img_extents) {
2875                                ret = rbd_obj_read_from_parent(obj_req);
2876                                if (ret) {
2877                                        *result = ret;
2878                                        return true;
2879                                }
2880                                obj_req->read_state = RBD_OBJ_READ_PARENT;
2881                                return false;
2882                        }
2883                }
2884
2885                /*
2886                 * -ENOENT means a hole in the image -- zero-fill the entire
2887                 * length of the request.  A short read also implies zero-fill
2888                 * to the end of the request.
2889                 */
2890                if (*result == -ENOENT) {
2891                        rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2892                        *result = 0;
2893                } else if (*result >= 0) {
2894                        if (*result < obj_req->ex.oe_len)
2895                                rbd_obj_zero_range(obj_req, *result,
2896                                                obj_req->ex.oe_len - *result);
2897                        else
2898                                rbd_assert(*result == obj_req->ex.oe_len);
2899                        *result = 0;
2900                }
2901                return true;
2902        case RBD_OBJ_READ_PARENT:
2903                /*
2904                 * The parent image is read only up to the overlap -- zero-fill
2905                 * from the overlap to the end of the request.
2906                 */
2907                if (!*result) {
2908                        u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2909
2910                        if (obj_overlap < obj_req->ex.oe_len)
2911                                rbd_obj_zero_range(obj_req, obj_overlap,
2912                                            obj_req->ex.oe_len - obj_overlap);
2913                }
2914                return true;
2915        default:
2916                BUG();
2917        }
2918}
2919
2920static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2921{
2922        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2923
2924        if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2925                obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2926
2927        if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2928            (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2929                dout("%s %p noop for nonexistent\n", __func__, obj_req);
2930                return true;
2931        }
2932
2933        return false;
2934}
2935
2936/*
2937 * Return:
2938 *   0 - object map update sent
2939 *   1 - object map update isn't needed
2940 *  <0 - error
2941 */
2942static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2943{
2944        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2945        u8 new_state;
2946
2947        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2948                return 1;
2949
2950        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
2951                new_state = OBJECT_PENDING;
2952        else
2953                new_state = OBJECT_EXISTS;
2954
2955        return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
2956}
2957
2958static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2959{
2960        struct ceph_osd_request *osd_req;
2961        int num_ops = count_write_ops(obj_req);
2962        int which = 0;
2963        int ret;
2964
2965        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
2966                num_ops++; /* stat */
2967
2968        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
2969        if (IS_ERR(osd_req))
2970                return PTR_ERR(osd_req);
2971
2972        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
2973                ret = rbd_osd_setup_stat(osd_req, which++);
2974                if (ret)
2975                        return ret;
2976        }
2977
2978        rbd_osd_setup_write_ops(osd_req, which);
2979        rbd_osd_format_write(osd_req);
2980
2981        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2982        if (ret)
2983                return ret;
2984
2985        rbd_osd_submit(osd_req);
2986        return 0;
2987}
2988
2989/*
2990 * copyup_bvecs pages are never highmem pages
2991 */
2992static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2993{
2994        struct ceph_bvec_iter it = {
2995                .bvecs = bvecs,
2996                .iter = { .bi_size = bytes },
2997        };
2998
2999        ceph_bvec_iter_advance_step(&it, bytes, ({
3000                if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3001                               bv.bv_len))
3002                        return false;
3003        }));
3004        return true;
3005}
3006
3007#define MODS_ONLY       U32_MAX
3008
3009static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3010                                      u32 bytes)
3011{
3012        struct ceph_osd_request *osd_req;
3013        int ret;
3014
3015        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3016        rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3017
3018        osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3019        if (IS_ERR(osd_req))
3020                return PTR_ERR(osd_req);
3021
3022        ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3023        if (ret)
3024                return ret;
3025
3026        rbd_osd_format_write(osd_req);
3027
3028        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3029        if (ret)
3030                return ret;
3031
3032        rbd_osd_submit(osd_req);
3033        return 0;
3034}
3035
3036static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3037                                        u32 bytes)
3038{
3039        struct ceph_osd_request *osd_req;
3040        int num_ops = count_write_ops(obj_req);
3041        int which = 0;
3042        int ret;
3043
3044        dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3045
3046        if (bytes != MODS_ONLY)
3047                num_ops++; /* copyup */
3048
3049        osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3050        if (IS_ERR(osd_req))
3051                return PTR_ERR(osd_req);
3052
3053        if (bytes != MODS_ONLY) {
3054                ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3055                if (ret)
3056                        return ret;
3057        }
3058
3059        rbd_osd_setup_write_ops(osd_req, which);
3060        rbd_osd_format_write(osd_req);
3061
3062        ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3063        if (ret)
3064                return ret;
3065
3066        rbd_osd_submit(osd_req);
3067        return 0;
3068}
3069
3070static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3071{
3072        u32 i;
3073
3074        rbd_assert(!obj_req->copyup_bvecs);
3075        obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3076        obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3077                                        sizeof(*obj_req->copyup_bvecs),
3078                                        GFP_NOIO);
3079        if (!obj_req->copyup_bvecs)
3080                return -ENOMEM;
3081
3082        for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3083                unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3084
3085                obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3086                if (!obj_req->copyup_bvecs[i].bv_page)
3087                        return -ENOMEM;
3088
3089                obj_req->copyup_bvecs[i].bv_offset = 0;
3090                obj_req->copyup_bvecs[i].bv_len = len;
3091                obj_overlap -= len;
3092        }
3093
3094        rbd_assert(!obj_overlap);
3095        return 0;
3096}
3097
3098/*
3099 * The target object doesn't exist.  Read the data for the entire
3100 * target object up to the overlap point (if any) from the parent,
3101 * so we can use it for a copyup.
3102 */
3103static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3104{
3105        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3106        int ret;
3107
3108        rbd_assert(obj_req->num_img_extents);
3109        prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3110                      rbd_dev->parent_overlap);
3111        if (!obj_req->num_img_extents) {
3112                /*
3113                 * The overlap has become 0 (most likely because the
3114                 * image has been flattened).  Re-submit the original write
3115                 * request -- pass MODS_ONLY since the copyup isn't needed
3116                 * anymore.
3117                 */
3118                return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3119        }
3120
3121        ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3122        if (ret)
3123                return ret;
3124
3125        return rbd_obj_read_from_parent(obj_req);
3126}
3127
3128static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3129{
3130        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3131        struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3132        u8 new_state;
3133        u32 i;
3134        int ret;
3135
3136        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3137
3138        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3139                return;
3140
3141        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3142                return;
3143
3144        for (i = 0; i < snapc->num_snaps; i++) {
3145                if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3146                    i + 1 < snapc->num_snaps)
3147                        new_state = OBJECT_EXISTS_CLEAN;
3148                else
3149                        new_state = OBJECT_EXISTS;
3150
3151                ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3152                                            new_state, NULL);
3153                if (ret < 0) {
3154                        obj_req->pending.result = ret;
3155                        return;
3156                }
3157
3158                rbd_assert(!ret);
3159                obj_req->pending.num_pending++;
3160        }
3161}
3162
3163static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3164{
3165        u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3166        int ret;
3167
3168        rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3169
3170        /*
3171         * Only send non-zero copyup data to save some I/O and network
3172         * bandwidth -- zero copyup data is equivalent to the object not
3173         * existing.
3174         */
3175        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3176                bytes = 0;
3177
3178        if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3179                /*
3180                 * Send a copyup request with an empty snapshot context to
3181                 * deep-copyup the object through all existing snapshots.
3182                 * A second request with the current snapshot context will be
3183                 * sent for the actual modification.
3184                 */
3185                ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3186                if (ret) {
3187                        obj_req->pending.result = ret;
3188                        return;
3189                }
3190
3191                obj_req->pending.num_pending++;
3192                bytes = MODS_ONLY;
3193        }
3194
3195        ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3196        if (ret) {
3197                obj_req->pending.result = ret;
3198                return;
3199        }
3200
3201        obj_req->pending.num_pending++;
3202}
3203
3204static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3205{
3206        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3207        int ret;
3208
3209again:
3210        switch (obj_req->copyup_state) {
3211        case RBD_OBJ_COPYUP_START:
3212                rbd_assert(!*result);
3213
3214                ret = rbd_obj_copyup_read_parent(obj_req);
3215                if (ret) {
3216                        *result = ret;
3217                        return true;
3218                }
3219                if (obj_req->num_img_extents)
3220                        obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3221                else
3222                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3223                return false;
3224        case RBD_OBJ_COPYUP_READ_PARENT:
3225                if (*result)
3226                        return true;
3227
3228                if (is_zero_bvecs(obj_req->copyup_bvecs,
3229                                  rbd_obj_img_extents_bytes(obj_req))) {
3230                        dout("%s %p detected zeros\n", __func__, obj_req);
3231                        obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3232                }
3233
3234                rbd_obj_copyup_object_maps(obj_req);
3235                if (!obj_req->pending.num_pending) {
3236                        *result = obj_req->pending.result;
3237                        obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3238                        goto again;
3239                }
3240                obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3241                return false;
3242        case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3243                if (!pending_result_dec(&obj_req->pending, result))
3244                        return false;
3245                fallthrough;
3246        case RBD_OBJ_COPYUP_OBJECT_MAPS:
3247                if (*result) {
3248                        rbd_warn(rbd_dev, "snap object map update failed: %d",
3249                                 *result);
3250                        return true;
3251                }
3252
3253                rbd_obj_copyup_write_object(obj_req);
3254                if (!obj_req->pending.num_pending) {
3255                        *result = obj_req->pending.result;
3256                        obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3257                        goto again;
3258                }
3259                obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3260                return false;
3261        case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3262                if (!pending_result_dec(&obj_req->pending, result))
3263                        return false;
3264                fallthrough;
3265        case RBD_OBJ_COPYUP_WRITE_OBJECT:
3266                return true;
3267        default:
3268                BUG();
3269        }
3270}
3271
3272/*
3273 * Return:
3274 *   0 - object map update sent
3275 *   1 - object map update isn't needed
3276 *  <0 - error
3277 */
3278static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3279{
3280        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3281        u8 current_state = OBJECT_PENDING;
3282
3283        if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3284                return 1;
3285
3286        if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3287                return 1;
3288
3289        return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3290                                     &current_state);
3291}
3292
3293static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3294{
3295        struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3296        int ret;
3297
3298again:
3299        switch (obj_req->write_state) {
3300        case RBD_OBJ_WRITE_START:
3301                rbd_assert(!*result);
3302
3303                if (rbd_obj_write_is_noop(obj_req))
3304                        return true;
3305
3306                ret = rbd_obj_write_pre_object_map(obj_req);
3307                if (ret < 0) {
3308                        *result = ret;
3309                        return true;
3310                }
3311                obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3312                if (ret > 0)
3313                        goto again;
3314                return false;
3315        case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3316                if (*result) {
3317                        rbd_warn(rbd_dev, "pre object map update failed: %d",
3318                                 *result);
3319                        return true;
3320                }
3321                ret = rbd_obj_write_object(obj_req);
3322                if (ret) {
3323                        *result = ret;
3324                        return true;
3325                }
3326                obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3327                return false;
3328        case RBD_OBJ_WRITE_OBJECT:
3329                if (*result == -ENOENT) {
3330                        if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3331                                *result = 0;
3332                                obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3333                                obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3334                                goto again;
3335                        }
3336                        /*
3337                         * On a non-existent object:
3338                         *   delete - -ENOENT, truncate/zero - 0
3339                         */
3340                        if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3341                                *result = 0;
3342                }
3343                if (*result)
3344                        return true;
3345
3346                obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3347                goto again;
3348        case __RBD_OBJ_WRITE_COPYUP:
3349                if (!rbd_obj_advance_copyup(obj_req, result))
3350                        return false;
3351                fallthrough;
3352        case RBD_OBJ_WRITE_COPYUP:
3353                if (*result) {
3354                        rbd_warn(rbd_dev, "copyup failed: %d", *result);
3355                        return true;
3356                }
3357                ret = rbd_obj_write_post_object_map(obj_req);
3358                if (ret < 0) {
3359                        *result = ret;
3360                        return true;
3361                }
3362                obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3363                if (ret > 0)
3364                        goto again;
3365                return false;
3366        case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3367                if (*result)
3368                        rbd_warn(rbd_dev, "post object map update failed: %d",
3369                                 *result);
3370                return true;
3371        default:
3372                BUG();
3373        }
3374}
3375
3376/*
3377 * Return true if @obj_req is completed.
3378 */
3379static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3380                                     int *result)
3381{
3382        struct rbd_img_request *img_req = obj_req->img_request;
3383        struct rbd_device *rbd_dev = img_req->rbd_dev;
3384        bool done;
3385
3386        mutex_lock(&obj_req->state_mutex);
3387        if (!rbd_img_is_write(img_req))
3388                done = rbd_obj_advance_read(obj_req, result);
3389        else
3390                done = rbd_obj_advance_write(obj_req, result);
3391        mutex_unlock(&obj_req->state_mutex);
3392
3393        if (done && *result) {
3394                rbd_assert(*result < 0);
3395                rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3396                         obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3397                         obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3398        }
3399        return done;
3400}
3401
3402/*
3403 * This is open-coded in rbd_img_handle_request() to avoid parent chain
3404 * recursion.
3405 */
3406static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3407{
3408        if (__rbd_obj_handle_request(obj_req, &result))
3409                rbd_img_handle_request(obj_req->img_request, result);
3410}
3411
3412static bool need_exclusive_lock(struct rbd_img_request *img_req)
3413{
3414        struct rbd_device *rbd_dev = img_req->rbd_dev;
3415
3416        if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3417                return false;
3418
3419        if (rbd_is_ro(rbd_dev))
3420                return false;
3421
3422        rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3423        if (rbd_dev->opts->lock_on_read ||
3424            (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3425                return true;
3426
3427        return rbd_img_is_write(img_req);
3428}
3429
3430static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3431{
3432        struct rbd_device *rbd_dev = img_req->rbd_dev;
3433        bool locked;
3434
3435        lockdep_assert_held(&rbd_dev->lock_rwsem);
3436        locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3437        spin_lock(&rbd_dev->lock_lists_lock);
3438        rbd_assert(list_empty(&img_req->lock_item));
3439        if (!locked)
3440                list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3441        else
3442                list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3443        spin_unlock(&rbd_dev->lock_lists_lock);
3444        return locked;
3445}
3446
3447static void rbd_lock_del_request(struct rbd_img_request *img_req)
3448{
3449        struct rbd_device *rbd_dev = img_req->rbd_dev;
3450        bool need_wakeup;
3451
3452        lockdep_assert_held(&rbd_dev->lock_rwsem);
3453        spin_lock(&rbd_dev->lock_lists_lock);
3454        rbd_assert(!list_empty(&img_req->lock_item));
3455        list_del_init(&img_req->lock_item);
3456        need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3457                       list_empty(&rbd_dev->running_list));
3458        spin_unlock(&rbd_dev->lock_lists_lock);
3459        if (need_wakeup)
3460                complete(&rbd_dev->releasing_wait);
3461}
3462
3463static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3464{
3465        struct rbd_device *rbd_dev = img_req->rbd_dev;
3466
3467        if (!need_exclusive_lock(img_req))
3468                return 1;
3469
3470        if (rbd_lock_add_request(img_req))
3471                return 1;
3472
3473        if (rbd_dev->opts->exclusive) {
3474                WARN_ON(1); /* lock got released? */
3475                return -EROFS;
3476        }
3477
3478        /*
3479         * Note the use of mod_delayed_work() in rbd_acquire_lock()
3480         * and cancel_delayed_work() in wake_lock_waiters().
3481         */
3482        dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3483        queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3484        return 0;
3485}
3486
3487static void rbd_img_object_requests(struct rbd_img_request *img_req)
3488{
3489        struct rbd_obj_request *obj_req;
3490
3491        rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3492
3493        for_each_obj_request(img_req, obj_req) {
3494                int result = 0;
3495
3496                if (__rbd_obj_handle_request(obj_req, &result)) {
3497                        if (result) {
3498                                img_req->pending.result = result;
3499                                return;
3500                        }
3501                } else {
3502                        img_req->pending.num_pending++;
3503                }
3504        }
3505}
3506
3507static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3508{
3509        struct rbd_device *rbd_dev = img_req->rbd_dev;
3510        int ret;
3511
3512again:
3513        switch (img_req->state) {
3514        case RBD_IMG_START:
3515                rbd_assert(!*result);
3516
3517                ret = rbd_img_exclusive_lock(img_req);
3518                if (ret < 0) {
3519                        *result = ret;
3520                        return true;
3521                }
3522                img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3523                if (ret > 0)
3524                        goto again;
3525                return false;
3526        case RBD_IMG_EXCLUSIVE_LOCK:
3527                if (*result)
3528                        return true;
3529
3530                rbd_assert(!need_exclusive_lock(img_req) ||
3531                           __rbd_is_lock_owner(rbd_dev));
3532
3533                rbd_img_object_requests(img_req);
3534                if (!img_req->pending.num_pending) {
3535                        *result = img_req->pending.result;
3536                        img_req->state = RBD_IMG_OBJECT_REQUESTS;
3537                        goto again;
3538                }
3539                img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3540                return false;
3541        case __RBD_IMG_OBJECT_REQUESTS:
3542                if (!pending_result_dec(&img_req->pending, result))
3543                        return false;
3544                fallthrough;
3545        case RBD_IMG_OBJECT_REQUESTS:
3546                return true;
3547        default:
3548                BUG();
3549        }
3550}
3551
3552/*
3553 * Return true if @img_req is completed.
3554 */
3555static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3556                                     int *result)
3557{
3558        struct rbd_device *rbd_dev = img_req->rbd_dev;
3559        bool done;
3560
3561        if (need_exclusive_lock(img_req)) {
3562                down_read(&rbd_dev->lock_rwsem);
3563                mutex_lock(&img_req->state_mutex);
3564                done = rbd_img_advance(img_req, result);
3565                if (done)
3566                        rbd_lock_del_request(img_req);
3567                mutex_unlock(&img_req->state_mutex);
3568                up_read(&rbd_dev->lock_rwsem);
3569        } else {
3570                mutex_lock(&img_req->state_mutex);
3571                done = rbd_img_advance(img_req, result);
3572                mutex_unlock(&img_req->state_mutex);
3573        }
3574
3575        if (done && *result) {
3576                rbd_assert(*result < 0);
3577                rbd_warn(rbd_dev, "%s%s result %d",
3578                      test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3579                      obj_op_name(img_req->op_type), *result);
3580        }
3581        return done;
3582}
3583
3584static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3585{
3586again:
3587        if (!__rbd_img_handle_request(img_req, &result))
3588                return;
3589
3590        if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3591                struct rbd_obj_request *obj_req = img_req->obj_request;
3592
3593                rbd_img_request_destroy(img_req);
3594                if (__rbd_obj_handle_request(obj_req, &result)) {
3595                        img_req = obj_req->img_request;
3596                        goto again;
3597                }
3598        } else {
3599                struct request *rq = blk_mq_rq_from_pdu(img_req);
3600
3601                rbd_img_request_destroy(img_req);
3602                blk_mq_end_request(rq, errno_to_blk_status(result));
3603        }
3604}
3605
3606static const struct rbd_client_id rbd_empty_cid;
3607
3608static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3609                          const struct rbd_client_id *rhs)
3610{
3611        return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3612}
3613
3614static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3615{
3616        struct rbd_client_id cid;
3617
3618        mutex_lock(&rbd_dev->watch_mutex);
3619        cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3620        cid.handle = rbd_dev->watch_cookie;
3621        mutex_unlock(&rbd_dev->watch_mutex);
3622        return cid;
3623}
3624
3625/*
3626 * lock_rwsem must be held for write
3627 */
3628static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3629                              const struct rbd_client_id *cid)
3630{
3631        dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3632             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3633             cid->gid, cid->handle);
3634        rbd_dev->owner_cid = *cid; /* struct */
3635}
3636
3637static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3638{
3639        mutex_lock(&rbd_dev->watch_mutex);
3640        sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3641        mutex_unlock(&rbd_dev->watch_mutex);
3642}
3643
3644static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3645{
3646        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3647
3648        rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3649        strcpy(rbd_dev->lock_cookie, cookie);
3650        rbd_set_owner_cid(rbd_dev, &cid);
3651        queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3652}
3653
3654/*
3655 * lock_rwsem must be held for write
3656 */
3657static int rbd_lock(struct rbd_device *rbd_dev)
3658{
3659        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3660        char cookie[32];
3661        int ret;
3662
3663        WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3664                rbd_dev->lock_cookie[0] != '\0');
3665
3666        format_lock_cookie(rbd_dev, cookie);
3667        ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3668                            RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3669                            RBD_LOCK_TAG, "", 0);
3670        if (ret)
3671                return ret;
3672
3673        __rbd_lock(rbd_dev, cookie);
3674        return 0;
3675}
3676
3677/*
3678 * lock_rwsem must be held for write
3679 */
3680static void rbd_unlock(struct rbd_device *rbd_dev)
3681{
3682        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3683        int ret;
3684
3685        WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3686                rbd_dev->lock_cookie[0] == '\0');
3687
3688        ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3689                              RBD_LOCK_NAME, rbd_dev->lock_cookie);
3690        if (ret && ret != -ENOENT)
3691                rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3692
3693        /* treat errors as the image is unlocked */
3694        rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3695        rbd_dev->lock_cookie[0] = '\0';
3696        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3697        queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3698}
3699
3700static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3701                                enum rbd_notify_op notify_op,
3702                                struct page ***preply_pages,
3703                                size_t *preply_len)
3704{
3705        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3706        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3707        char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3708        int buf_size = sizeof(buf);
3709        void *p = buf;
3710
3711        dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3712
3713        /* encode *LockPayload NotifyMessage (op + ClientId) */
3714        ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3715        ceph_encode_32(&p, notify_op);
3716        ceph_encode_64(&p, cid.gid);
3717        ceph_encode_64(&p, cid.handle);
3718
3719        return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3720                                &rbd_dev->header_oloc, buf, buf_size,
3721                                RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3722}
3723
3724static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3725                               enum rbd_notify_op notify_op)
3726{
3727        __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3728}
3729
3730static void rbd_notify_acquired_lock(struct work_struct *work)
3731{
3732        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3733                                                  acquired_lock_work);
3734
3735        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3736}
3737
3738static void rbd_notify_released_lock(struct work_struct *work)
3739{
3740        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3741                                                  released_lock_work);
3742
3743        rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3744}
3745
3746static int rbd_request_lock(struct rbd_device *rbd_dev)
3747{
3748        struct page **reply_pages;
3749        size_t reply_len;
3750        bool lock_owner_responded = false;
3751        int ret;
3752
3753        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3754
3755        ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3756                                   &reply_pages, &reply_len);
3757        if (ret && ret != -ETIMEDOUT) {
3758                rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3759                goto out;
3760        }
3761
3762        if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3763                void *p = page_address(reply_pages[0]);
3764                void *const end = p + reply_len;
3765                u32 n;
3766
3767                ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3768                while (n--) {
3769                        u8 struct_v;
3770                        u32 len;
3771
3772                        ceph_decode_need(&p, end, 8 + 8, e_inval);
3773                        p += 8 + 8; /* skip gid and cookie */
3774
3775                        ceph_decode_32_safe(&p, end, len, e_inval);
3776                        if (!len)
3777                                continue;
3778
3779                        if (lock_owner_responded) {
3780                                rbd_warn(rbd_dev,
3781                                         "duplicate lock owners detected");
3782                                ret = -EIO;
3783                                goto out;
3784                        }
3785
3786                        lock_owner_responded = true;
3787                        ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3788                                                  &struct_v, &len);
3789                        if (ret) {
3790                                rbd_warn(rbd_dev,
3791                                         "failed to decode ResponseMessage: %d",
3792                                         ret);
3793                                goto e_inval;
3794                        }
3795
3796                        ret = ceph_decode_32(&p);
3797                }
3798        }
3799
3800        if (!lock_owner_responded) {
3801                rbd_warn(rbd_dev, "no lock owners detected");
3802                ret = -ETIMEDOUT;
3803        }
3804
3805out:
3806        ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3807        return ret;
3808
3809e_inval:
3810        ret = -EINVAL;
3811        goto out;
3812}
3813
3814/*
3815 * Either image request state machine(s) or rbd_add_acquire_lock()
3816 * (i.e. "rbd map").
3817 */
3818static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3819{
3820        struct rbd_img_request *img_req;
3821
3822        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3823        lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3824
3825        cancel_delayed_work(&rbd_dev->lock_dwork);
3826        if (!completion_done(&rbd_dev->acquire_wait)) {
3827                rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3828                           list_empty(&rbd_dev->running_list));
3829                rbd_dev->acquire_err = result;
3830                complete_all(&rbd_dev->acquire_wait);
3831                return;
3832        }
3833
3834        list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3835                mutex_lock(&img_req->state_mutex);
3836                rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3837                rbd_img_schedule(img_req, result);
3838                mutex_unlock(&img_req->state_mutex);
3839        }
3840
3841        list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3842}
3843
3844static int get_lock_owner_info(struct rbd_device *rbd_dev,
3845                               struct ceph_locker **lockers, u32 *num_lockers)
3846{
3847        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3848        u8 lock_type;
3849        char *lock_tag;
3850        int ret;
3851
3852        dout("%s rbd_dev %p\n", __func__, rbd_dev);
3853
3854        ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3855                                 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3856                                 &lock_type, &lock_tag, lockers, num_lockers);
3857        if (ret)
3858                return ret;
3859
3860        if (*num_lockers == 0) {
3861                dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3862                goto out;
3863        }
3864
3865        if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3866                rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3867                         lock_tag);
3868                ret = -EBUSY;
3869                goto out;
3870        }
3871
3872        if (lock_type == CEPH_CLS_LOCK_SHARED) {
3873                rbd_warn(rbd_dev, "shared lock type detected");
3874                ret = -EBUSY;
3875                goto out;
3876        }
3877
3878        if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3879                    strlen(RBD_LOCK_COOKIE_PREFIX))) {
3880                rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3881                         (*lockers)[0].id.cookie);
3882                ret = -EBUSY;
3883                goto out;
3884        }
3885
3886out:
3887        kfree(lock_tag);
3888        return ret;
3889}
3890
3891static int find_watcher(struct rbd_device *rbd_dev,
3892                        const struct ceph_locker *locker)
3893{
3894        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3895        struct ceph_watch_item *watchers;
3896        u32 num_watchers;
3897        u64 cookie;
3898        int i;
3899        int ret;
3900
3901        ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3902                                      &rbd_dev->header_oloc, &watchers,
3903                                      &num_watchers);
3904        if (ret)
3905                return ret;
3906
3907        sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3908        for (i = 0; i < num_watchers; i++) {
3909                /*
3910                 * Ignore addr->type while comparing.  This mimics
3911                 * entity_addr_t::get_legacy_str() + strcmp().
3912                 */
3913                if (ceph_addr_equal_no_type(&watchers[i].addr,
3914                                            &locker->info.addr) &&
3915                    watchers[i].cookie == cookie) {
3916                        struct rbd_client_id cid = {
3917                                .gid = le64_to_cpu(watchers[i].name.num),
3918                                .handle = cookie,
3919                        };
3920
3921                        dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3922                             rbd_dev, cid.gid, cid.handle);
3923                        rbd_set_owner_cid(rbd_dev, &cid);
3924                        ret = 1;
3925                        goto out;
3926                }
3927        }
3928
3929        dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3930        ret = 0;
3931out:
3932        kfree(watchers);
3933        return ret;
3934}
3935
3936/*
3937 * lock_rwsem must be held for write
3938 */
3939static int rbd_try_lock(struct rbd_device *rbd_dev)
3940{
3941        struct ceph_client *client = rbd_dev->rbd_client->client;
3942        struct ceph_locker *lockers;
3943        u32 num_lockers;
3944        int ret;
3945
3946        for (;;) {
3947                ret = rbd_lock(rbd_dev);
3948                if (ret != -EBUSY)
3949                        return ret;
3950
3951                /* determine if the current lock holder is still alive */
3952                ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3953                if (ret)
3954                        return ret;
3955
3956                if (num_lockers == 0)
3957                        goto again;
3958
3959                ret = find_watcher(rbd_dev, lockers);
3960                if (ret)
3961                        goto out; /* request lock or error */
3962
3963                rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
3964                         ENTITY_NAME(lockers[0].id.name));
3965
3966                ret = ceph_monc_blocklist_add(&client->monc,
3967                                              &lockers[0].info.addr);
3968                if (ret) {
3969                        rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
3970                                 ENTITY_NAME(lockers[0].id.name), ret);
3971                        goto out;
3972                }
3973
3974                ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3975                                          &rbd_dev->header_oloc, RBD_LOCK_NAME,
3976                                          lockers[0].id.cookie,
3977                                          &lockers[0].id.name);
3978                if (ret && ret != -ENOENT)
3979                        goto out;
3980
3981again:
3982                ceph_free_lockers(lockers, num_lockers);
3983        }
3984
3985out:
3986        ceph_free_lockers(lockers, num_lockers);
3987        return ret;
3988}
3989
3990static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
3991{
3992        int ret;
3993
3994        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
3995                ret = rbd_object_map_open(rbd_dev);
3996                if (ret)
3997                        return ret;
3998        }
3999
4000        return 0;
4001}
4002
4003/*
4004 * Return:
4005 *   0 - lock acquired
4006 *   1 - caller should call rbd_request_lock()
4007 *  <0 - error
4008 */
4009static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4010{
4011        int ret;
4012
4013        down_read(&rbd_dev->lock_rwsem);
4014        dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4015             rbd_dev->lock_state);
4016        if (__rbd_is_lock_owner(rbd_dev)) {
4017                up_read(&rbd_dev->lock_rwsem);
4018                return 0;
4019        }
4020
4021        up_read(&rbd_dev->lock_rwsem);
4022        down_write(&rbd_dev->lock_rwsem);
4023        dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4024             rbd_dev->lock_state);
4025        if (__rbd_is_lock_owner(rbd_dev)) {
4026                up_write(&rbd_dev->lock_rwsem);
4027                return 0;
4028        }
4029
4030        ret = rbd_try_lock(rbd_dev);
4031        if (ret < 0) {
4032                rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4033                if (ret == -EBLOCKLISTED)
4034                        goto out;
4035
4036                ret = 1; /* request lock anyway */
4037        }
4038        if (ret > 0) {
4039                up_write(&rbd_dev->lock_rwsem);
4040                return ret;
4041        }
4042
4043        rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4044        rbd_assert(list_empty(&rbd_dev->running_list));
4045
4046        ret = rbd_post_acquire_action(rbd_dev);
4047        if (ret) {
4048                rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4049                /*
4050                 * Can't stay in RBD_LOCK_STATE_LOCKED because
4051                 * rbd_lock_add_request() would let the request through,
4052                 * assuming that e.g. object map is locked and loaded.
4053                 */
4054                rbd_unlock(rbd_dev);
4055        }
4056
4057out:
4058        wake_lock_waiters(rbd_dev, ret);
4059        up_write(&rbd_dev->lock_rwsem);
4060        return ret;
4061}
4062
4063static void rbd_acquire_lock(struct work_struct *work)
4064{
4065        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4066                                            struct rbd_device, lock_dwork);
4067        int ret;
4068
4069        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4070again:
4071        ret = rbd_try_acquire_lock(rbd_dev);
4072        if (ret <= 0) {
4073                dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4074                return;
4075        }
4076
4077        ret = rbd_request_lock(rbd_dev);
4078        if (ret == -ETIMEDOUT) {
4079                goto again; /* treat this as a dead client */
4080        } else if (ret == -EROFS) {
4081                rbd_warn(rbd_dev, "peer will not release lock");
4082                down_write(&rbd_dev->lock_rwsem);
4083                wake_lock_waiters(rbd_dev, ret);
4084                up_write(&rbd_dev->lock_rwsem);
4085        } else if (ret < 0) {
4086                rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4087                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4088                                 RBD_RETRY_DELAY);
4089        } else {
4090                /*
4091                 * lock owner acked, but resend if we don't see them
4092                 * release the lock
4093                 */
4094                dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4095                     rbd_dev);
4096                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4097                    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4098        }
4099}
4100
4101static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4102{
4103        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4104        lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4105
4106        if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4107                return false;
4108
4109        /*
4110         * Ensure that all in-flight IO is flushed.
4111         */
4112        rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4113        rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4114        if (list_empty(&rbd_dev->running_list))
4115                return true;
4116
4117        up_write(&rbd_dev->lock_rwsem);
4118        wait_for_completion(&rbd_dev->releasing_wait);
4119
4120        down_write(&rbd_dev->lock_rwsem);
4121        if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4122                return false;
4123
4124        rbd_assert(list_empty(&rbd_dev->running_list));
4125        return true;
4126}
4127
4128static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4129{
4130        if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4131                rbd_object_map_close(rbd_dev);
4132}
4133
4134static void __rbd_release_lock(struct rbd_device *rbd_dev)
4135{
4136        rbd_assert(list_empty(&rbd_dev->running_list));
4137
4138        rbd_pre_release_action(rbd_dev);
4139        rbd_unlock(rbd_dev);
4140}
4141
4142/*
4143 * lock_rwsem must be held for write
4144 */
4145static void rbd_release_lock(struct rbd_device *rbd_dev)
4146{
4147        if (!rbd_quiesce_lock(rbd_dev))
4148                return;
4149
4150        __rbd_release_lock(rbd_dev);
4151
4152        /*
4153         * Give others a chance to grab the lock - we would re-acquire
4154         * almost immediately if we got new IO while draining the running
4155         * list otherwise.  We need to ack our own notifications, so this
4156         * lock_dwork will be requeued from rbd_handle_released_lock() by
4157         * way of maybe_kick_acquire().
4158         */
4159        cancel_delayed_work(&rbd_dev->lock_dwork);
4160}
4161
4162static void rbd_release_lock_work(struct work_struct *work)
4163{
4164        struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4165                                                  unlock_work);
4166
4167        down_write(&rbd_dev->lock_rwsem);
4168        rbd_release_lock(rbd_dev);
4169        up_write(&rbd_dev->lock_rwsem);
4170}
4171
4172static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4173{
4174        bool have_requests;
4175
4176        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4177        if (__rbd_is_lock_owner(rbd_dev))
4178                return;
4179
4180        spin_lock(&rbd_dev->lock_lists_lock);
4181        have_requests = !list_empty(&rbd_dev->acquiring_list);
4182        spin_unlock(&rbd_dev->lock_lists_lock);
4183        if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4184                dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4185                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4186        }
4187}
4188
4189static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4190                                     void **p)
4191{
4192        struct rbd_client_id cid = { 0 };
4193
4194        if (struct_v >= 2) {
4195                cid.gid = ceph_decode_64(p);
4196                cid.handle = ceph_decode_64(p);
4197        }
4198
4199        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4200             cid.handle);
4201        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4202                down_write(&rbd_dev->lock_rwsem);
4203                if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4204                        dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4205                             __func__, rbd_dev, cid.gid, cid.handle);
4206                } else {
4207                        rbd_set_owner_cid(rbd_dev, &cid);
4208                }
4209                downgrade_write(&rbd_dev->lock_rwsem);
4210        } else {
4211                down_read(&rbd_dev->lock_rwsem);
4212        }
4213
4214        maybe_kick_acquire(rbd_dev);
4215        up_read(&rbd_dev->lock_rwsem);
4216}
4217
4218static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4219                                     void **p)
4220{
4221        struct rbd_client_id cid = { 0 };
4222
4223        if (struct_v >= 2) {
4224                cid.gid = ceph_decode_64(p);
4225                cid.handle = ceph_decode_64(p);
4226        }
4227
4228        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4229             cid.handle);
4230        if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4231                down_write(&rbd_dev->lock_rwsem);
4232                if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4233                        dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4234                             __func__, rbd_dev, cid.gid, cid.handle,
4235                             rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4236                } else {
4237                        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4238                }
4239                downgrade_write(&rbd_dev->lock_rwsem);
4240        } else {
4241                down_read(&rbd_dev->lock_rwsem);
4242        }
4243
4244        maybe_kick_acquire(rbd_dev);
4245        up_read(&rbd_dev->lock_rwsem);
4246}
4247
4248/*
4249 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4250 * ResponseMessage is needed.
4251 */
4252static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4253                                   void **p)
4254{
4255        struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4256        struct rbd_client_id cid = { 0 };
4257        int result = 1;
4258
4259        if (struct_v >= 2) {
4260                cid.gid = ceph_decode_64(p);
4261                cid.handle = ceph_decode_64(p);
4262        }
4263
4264        dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4265             cid.handle);
4266        if (rbd_cid_equal(&cid, &my_cid))
4267                return result;
4268
4269        down_read(&rbd_dev->lock_rwsem);
4270        if (__rbd_is_lock_owner(rbd_dev)) {
4271                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4272                    rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4273                        goto out_unlock;
4274
4275                /*
4276                 * encode ResponseMessage(0) so the peer can detect
4277                 * a missing owner
4278                 */
4279                result = 0;
4280
4281                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4282                        if (!rbd_dev->opts->exclusive) {
4283                                dout("%s rbd_dev %p queueing unlock_work\n",
4284                                     __func__, rbd_dev);
4285                                queue_work(rbd_dev->task_wq,
4286                                           &rbd_dev->unlock_work);
4287                        } else {
4288                                /* refuse to release the lock */
4289                                result = -EROFS;
4290                        }
4291                }
4292        }
4293
4294out_unlock:
4295        up_read(&rbd_dev->lock_rwsem);
4296        return result;
4297}
4298
4299static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4300                                     u64 notify_id, u64 cookie, s32 *result)
4301{
4302        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4303        char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4304        int buf_size = sizeof(buf);
4305        int ret;
4306
4307        if (result) {
4308                void *p = buf;
4309
4310                /* encode ResponseMessage */
4311                ceph_start_encoding(&p, 1, 1,
4312                                    buf_size - CEPH_ENCODING_START_BLK_LEN);
4313                ceph_encode_32(&p, *result);
4314        } else {
4315                buf_size = 0;
4316        }
4317
4318        ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4319                                   &rbd_dev->header_oloc, notify_id, cookie,
4320                                   buf, buf_size);
4321        if (ret)
4322                rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4323}
4324
4325static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4326                                   u64 cookie)
4327{
4328        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4329        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4330}
4331
4332static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4333                                          u64 notify_id, u64 cookie, s32 result)
4334{
4335        dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4336        __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4337}
4338
4339static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4340                         u64 notifier_id, void *data, size_t data_len)
4341{
4342        struct rbd_device *rbd_dev = arg;
4343        void *p = data;
4344        void *const end = p + data_len;
4345        u8 struct_v = 0;
4346        u32 len;
4347        u32 notify_op;
4348        int ret;
4349
4350        dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4351             __func__, rbd_dev, cookie, notify_id, data_len);
4352        if (data_len) {
4353                ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4354                                          &struct_v, &len);
4355                if (ret) {
4356                        rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4357                                 ret);
4358                        return;
4359                }
4360
4361                notify_op = ceph_decode_32(&p);
4362        } else {
4363                /* legacy notification for header updates */
4364                notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4365                len = 0;
4366        }
4367
4368        dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4369        switch (notify_op) {
4370        case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4371                rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4372                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4373                break;
4374        case RBD_NOTIFY_OP_RELEASED_LOCK:
4375                rbd_handle_released_lock(rbd_dev, struct_v, &p);
4376                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4377                break;
4378        case RBD_NOTIFY_OP_REQUEST_LOCK:
4379                ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4380                if (ret <= 0)
4381                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4382                                                      cookie, ret);
4383                else
4384                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4385                break;
4386        case RBD_NOTIFY_OP_HEADER_UPDATE:
4387                ret = rbd_dev_refresh(rbd_dev);
4388                if (ret)
4389                        rbd_warn(rbd_dev, "refresh failed: %d", ret);
4390
4391                rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4392                break;
4393        default:
4394                if (rbd_is_lock_owner(rbd_dev))
4395                        rbd_acknowledge_notify_result(rbd_dev, notify_id,
4396                                                      cookie, -EOPNOTSUPP);
4397                else
4398                        rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4399                break;
4400        }
4401}
4402
4403static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4404
4405static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4406{
4407        struct rbd_device *rbd_dev = arg;
4408
4409        rbd_warn(rbd_dev, "encountered watch error: %d", err);
4410
4411        down_write(&rbd_dev->lock_rwsem);
4412        rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4413        up_write(&rbd_dev->lock_rwsem);
4414
4415        mutex_lock(&rbd_dev->watch_mutex);
4416        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4417                __rbd_unregister_watch(rbd_dev);
4418                rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4419
4420                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4421        }
4422        mutex_unlock(&rbd_dev->watch_mutex);
4423}
4424
4425/*
4426 * watch_mutex must be locked
4427 */
4428static int __rbd_register_watch(struct rbd_device *rbd_dev)
4429{
4430        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4431        struct ceph_osd_linger_request *handle;
4432
4433        rbd_assert(!rbd_dev->watch_handle);
4434        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4435
4436        handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4437                                 &rbd_dev->header_oloc, rbd_watch_cb,
4438                                 rbd_watch_errcb, rbd_dev);
4439        if (IS_ERR(handle))
4440                return PTR_ERR(handle);
4441
4442        rbd_dev->watch_handle = handle;
4443        return 0;
4444}
4445
4446/*
4447 * watch_mutex must be locked
4448 */
4449static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4450{
4451        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4452        int ret;
4453
4454        rbd_assert(rbd_dev->watch_handle);
4455        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4456
4457        ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4458        if (ret)
4459                rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4460
4461        rbd_dev->watch_handle = NULL;
4462}
4463
4464static int rbd_register_watch(struct rbd_device *rbd_dev)
4465{
4466        int ret;
4467
4468        mutex_lock(&rbd_dev->watch_mutex);
4469        rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4470        ret = __rbd_register_watch(rbd_dev);
4471        if (ret)
4472                goto out;
4473
4474        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4475        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4476
4477out:
4478        mutex_unlock(&rbd_dev->watch_mutex);
4479        return ret;
4480}
4481
4482static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4483{
4484        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4485
4486        cancel_work_sync(&rbd_dev->acquired_lock_work);
4487        cancel_work_sync(&rbd_dev->released_lock_work);
4488        cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4489        cancel_work_sync(&rbd_dev->unlock_work);
4490}
4491
4492/*
4493 * header_rwsem must not be held to avoid a deadlock with
4494 * rbd_dev_refresh() when flushing notifies.
4495 */
4496static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4497{
4498        cancel_tasks_sync(rbd_dev);
4499
4500        mutex_lock(&rbd_dev->watch_mutex);
4501        if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4502                __rbd_unregister_watch(rbd_dev);
4503        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4504        mutex_unlock(&rbd_dev->watch_mutex);
4505
4506        cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4507        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4508}
4509
4510/*
4511 * lock_rwsem must be held for write
4512 */
4513static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4514{
4515        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4516        char cookie[32];
4517        int ret;
4518
4519        if (!rbd_quiesce_lock(rbd_dev))
4520                return;
4521
4522        format_lock_cookie(rbd_dev, cookie);
4523        ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4524                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
4525                                  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4526                                  RBD_LOCK_TAG, cookie);
4527        if (ret) {
4528                if (ret != -EOPNOTSUPP)
4529                        rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4530                                 ret);
4531
4532                /*
4533                 * Lock cookie cannot be updated on older OSDs, so do
4534                 * a manual release and queue an acquire.
4535                 */
4536                __rbd_release_lock(rbd_dev);
4537                queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4538        } else {
4539                __rbd_lock(rbd_dev, cookie);
4540                wake_lock_waiters(rbd_dev, 0);
4541        }
4542}
4543
4544static void rbd_reregister_watch(struct work_struct *work)
4545{
4546        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4547                                            struct rbd_device, watch_dwork);
4548        int ret;
4549
4550        dout("%s rbd_dev %p\n", __func__, rbd_dev);
4551
4552        mutex_lock(&rbd_dev->watch_mutex);
4553        if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4554                mutex_unlock(&rbd_dev->watch_mutex);
4555                return;
4556        }
4557
4558        ret = __rbd_register_watch(rbd_dev);
4559        if (ret) {
4560                rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4561                if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4562                        queue_delayed_work(rbd_dev->task_wq,
4563                                           &rbd_dev->watch_dwork,
4564                                           RBD_RETRY_DELAY);
4565                        mutex_unlock(&rbd_dev->watch_mutex);
4566                        return;
4567                }
4568
4569                mutex_unlock(&rbd_dev->watch_mutex);
4570                down_write(&rbd_dev->lock_rwsem);
4571                wake_lock_waiters(rbd_dev, ret);
4572                up_write(&rbd_dev->lock_rwsem);
4573                return;
4574        }
4575
4576        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4577        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4578        mutex_unlock(&rbd_dev->watch_mutex);
4579
4580        down_write(&rbd_dev->lock_rwsem);
4581        if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4582                rbd_reacquire_lock(rbd_dev);
4583        up_write(&rbd_dev->lock_rwsem);
4584
4585        ret = rbd_dev_refresh(rbd_dev);
4586        if (ret)
4587                rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4588}
4589
4590/*
4591 * Synchronous osd object method call.  Returns the number of bytes
4592 * returned in the outbound buffer, or a negative error code.
4593 */
4594static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4595                             struct ceph_object_id *oid,
4596                             struct ceph_object_locator *oloc,
4597                             const char *method_name,
4598                             const void *outbound,
4599                             size_t outbound_size,
4600                             void *inbound,
4601                             size_t inbound_size)
4602{
4603        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4604        struct page *req_page = NULL;
4605        struct page *reply_page;
4606        int ret;
4607
4608        /*
4609         * Method calls are ultimately read operations.  The result
4610         * should placed into the inbound buffer provided.  They
4611         * also supply outbound data--parameters for the object
4612         * method.  Currently if this is present it will be a
4613         * snapshot id.
4614         */
4615        if (outbound) {
4616                if (outbound_size > PAGE_SIZE)
4617                        return -E2BIG;
4618
4619                req_page = alloc_page(GFP_KERNEL);
4620                if (!req_page)
4621                        return -ENOMEM;
4622
4623                memcpy(page_address(req_page), outbound, outbound_size);
4624        }
4625
4626        reply_page = alloc_page(