linux/net/ceph/osd_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/err.h>
   5#include <linux/highmem.h>
   6#include <linux/mm.h>
   7#include <linux/pagemap.h>
   8#include <linux/slab.h>
   9#include <linux/uaccess.h>
  10#ifdef CONFIG_BLOCK
  11#include <linux/bio.h>
  12#endif
  13
  14#include <linux/ceph/libceph.h>
  15#include <linux/ceph/osd_client.h>
  16#include <linux/ceph/messenger.h>
  17#include <linux/ceph/decode.h>
  18#include <linux/ceph/auth.h>
  19#include <linux/ceph/pagelist.h>
  20
  21#define OSD_OP_FRONT_LEN        4096
  22#define OSD_OPREPLY_FRONT_LEN   512
  23
  24static const struct ceph_connection_operations osd_con_ops;
  25
  26static void send_queued(struct ceph_osd_client *osdc);
  27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
  28static void __register_request(struct ceph_osd_client *osdc,
  29                               struct ceph_osd_request *req);
  30static void __unregister_linger_request(struct ceph_osd_client *osdc,
  31                                        struct ceph_osd_request *req);
  32static void __send_request(struct ceph_osd_client *osdc,
  33                           struct ceph_osd_request *req);
  34
  35static int op_needs_trail(int op)
  36{
  37        switch (op) {
  38        case CEPH_OSD_OP_GETXATTR:
  39        case CEPH_OSD_OP_SETXATTR:
  40        case CEPH_OSD_OP_CMPXATTR:
  41        case CEPH_OSD_OP_CALL:
  42        case CEPH_OSD_OP_NOTIFY:
  43                return 1;
  44        default:
  45                return 0;
  46        }
  47}
  48
  49static int op_has_extent(int op)
  50{
  51        return (op == CEPH_OSD_OP_READ ||
  52                op == CEPH_OSD_OP_WRITE);
  53}
  54
  55int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
  56                        struct ceph_file_layout *layout,
  57                        u64 snapid,
  58                        u64 off, u64 *plen, u64 *bno,
  59                        struct ceph_osd_request *req,
  60                        struct ceph_osd_req_op *op)
  61{
  62        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
  63        u64 orig_len = *plen;
  64        u64 objoff, objlen;    /* extent in object */
  65        int r;
  66
  67        reqhead->snapid = cpu_to_le64(snapid);
  68
  69        /* object extent? */
  70        r = ceph_calc_file_object_mapping(layout, off, plen, bno,
  71                                          &objoff, &objlen);
  72        if (r < 0)
  73                return r;
  74        if (*plen < orig_len)
  75                dout(" skipping last %llu, final file extent %llu~%llu\n",
  76                     orig_len - *plen, off, *plen);
  77
  78        if (op_has_extent(op->op)) {
  79                op->extent.offset = objoff;
  80                op->extent.length = objlen;
  81        }
  82        req->r_num_pages = calc_pages_for(off, *plen);
  83        req->r_page_alignment = off & ~PAGE_MASK;
  84        if (op->op == CEPH_OSD_OP_WRITE)
  85                op->payload_len = *plen;
  86
  87        dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
  88             *bno, objoff, objlen, req->r_num_pages);
  89        return 0;
  90}
  91EXPORT_SYMBOL(ceph_calc_raw_layout);
  92
  93/*
  94 * Implement client access to distributed object storage cluster.
  95 *
  96 * All data objects are stored within a cluster/cloud of OSDs, or
  97 * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
  98 * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
  99 * remote daemons serving up and coordinating consistent and safe
 100 * access to storage.
 101 *
 102 * Cluster membership and the mapping of data objects onto storage devices
 103 * are described by the osd map.
 104 *
 105 * We keep track of pending OSD requests (read, write), resubmit
 106 * requests to different OSDs when the cluster topology/data layout
 107 * change, or retry the affected requests when the communications
 108 * channel with an OSD is reset.
 109 */
 110
 111/*
 112 * calculate the mapping of a file extent onto an object, and fill out the
 113 * request accordingly.  shorten extent as necessary if it crosses an
 114 * object boundary.
 115 *
 116 * fill osd op in request message.
 117 */
 118static int calc_layout(struct ceph_osd_client *osdc,
 119                       struct ceph_vino vino,
 120                       struct ceph_file_layout *layout,
 121                       u64 off, u64 *plen,
 122                       struct ceph_osd_request *req,
 123                       struct ceph_osd_req_op *op)
 124{
 125        u64 bno;
 126        int r;
 127
 128        r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
 129                                 plen, &bno, req, op);
 130        if (r < 0)
 131                return r;
 132
 133        snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
 134        req->r_oid_len = strlen(req->r_oid);
 135
 136        return r;
 137}
 138
 139/*
 140 * requests
 141 */
 142void ceph_osdc_release_request(struct kref *kref)
 143{
 144        struct ceph_osd_request *req = container_of(kref,
 145                                                    struct ceph_osd_request,
 146                                                    r_kref);
 147
 148        if (req->r_request)
 149                ceph_msg_put(req->r_request);
 150        if (req->r_con_filling_msg) {
 151                dout("%s revoking pages %p from con %p\n", __func__,
 152                     req->r_pages, req->r_con_filling_msg);
 153                ceph_msg_revoke_incoming(req->r_reply);
 154                req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
 155        }
 156        if (req->r_reply)
 157                ceph_msg_put(req->r_reply);
 158        if (req->r_own_pages)
 159                ceph_release_page_vector(req->r_pages,
 160                                         req->r_num_pages);
 161#ifdef CONFIG_BLOCK
 162        if (req->r_bio)
 163                bio_put(req->r_bio);
 164#endif
 165        ceph_put_snap_context(req->r_snapc);
 166        if (req->r_trail) {
 167                ceph_pagelist_release(req->r_trail);
 168                kfree(req->r_trail);
 169        }
 170        if (req->r_mempool)
 171                mempool_free(req, req->r_osdc->req_mempool);
 172        else
 173                kfree(req);
 174}
 175EXPORT_SYMBOL(ceph_osdc_release_request);
 176
 177static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
 178{
 179        int i = 0;
 180
 181        if (needs_trail)
 182                *needs_trail = 0;
 183        while (ops[i].op) {
 184                if (needs_trail && op_needs_trail(ops[i].op))
 185                        *needs_trail = 1;
 186                i++;
 187        }
 188
 189        return i;
 190}
 191
 192struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 193                                               int flags,
 194                                               struct ceph_snap_context *snapc,
 195                                               struct ceph_osd_req_op *ops,
 196                                               bool use_mempool,
 197                                               gfp_t gfp_flags,
 198                                               struct page **pages,
 199                                               struct bio *bio)
 200{
 201        struct ceph_osd_request *req;
 202        struct ceph_msg *msg;
 203        int needs_trail;
 204        int num_op = get_num_ops(ops, &needs_trail);
 205        size_t msg_size = sizeof(struct ceph_osd_request_head);
 206
 207        msg_size += num_op*sizeof(struct ceph_osd_op);
 208
 209        if (use_mempool) {
 210                req = mempool_alloc(osdc->req_mempool, gfp_flags);
 211                memset(req, 0, sizeof(*req));
 212        } else {
 213                req = kzalloc(sizeof(*req), gfp_flags);
 214        }
 215        if (req == NULL)
 216                return NULL;
 217
 218        req->r_osdc = osdc;
 219        req->r_mempool = use_mempool;
 220
 221        kref_init(&req->r_kref);
 222        init_completion(&req->r_completion);
 223        init_completion(&req->r_safe_completion);
 224        RB_CLEAR_NODE(&req->r_node);
 225        INIT_LIST_HEAD(&req->r_unsafe_item);
 226        INIT_LIST_HEAD(&req->r_linger_item);
 227        INIT_LIST_HEAD(&req->r_linger_osd);
 228        INIT_LIST_HEAD(&req->r_req_lru_item);
 229        INIT_LIST_HEAD(&req->r_osd_item);
 230
 231        req->r_flags = flags;
 232
 233        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
 234
 235        /* create reply message */
 236        if (use_mempool)
 237                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 238        else
 239                msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
 240                                   OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
 241        if (!msg) {
 242                ceph_osdc_put_request(req);
 243                return NULL;
 244        }
 245        req->r_reply = msg;
 246
 247        /* allocate space for the trailing data */
 248        if (needs_trail) {
 249                req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
 250                if (!req->r_trail) {
 251                        ceph_osdc_put_request(req);
 252                        return NULL;
 253                }
 254                ceph_pagelist_init(req->r_trail);
 255        }
 256
 257        /* create request message; allow space for oid */
 258        msg_size += MAX_OBJ_NAME_SIZE;
 259        if (snapc)
 260                msg_size += sizeof(u64) * snapc->num_snaps;
 261        if (use_mempool)
 262                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 263        else
 264                msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
 265        if (!msg) {
 266                ceph_osdc_put_request(req);
 267                return NULL;
 268        }
 269
 270        memset(msg->front.iov_base, 0, msg->front.iov_len);
 271
 272        req->r_request = msg;
 273        req->r_pages = pages;
 274#ifdef CONFIG_BLOCK
 275        if (bio) {
 276                req->r_bio = bio;
 277                bio_get(req->r_bio);
 278        }
 279#endif
 280
 281        return req;
 282}
 283EXPORT_SYMBOL(ceph_osdc_alloc_request);
 284
 285static void osd_req_encode_op(struct ceph_osd_request *req,
 286                              struct ceph_osd_op *dst,
 287                              struct ceph_osd_req_op *src)
 288{
 289        dst->op = cpu_to_le16(src->op);
 290
 291        switch (src->op) {
 292        case CEPH_OSD_OP_READ:
 293        case CEPH_OSD_OP_WRITE:
 294                dst->extent.offset =
 295                        cpu_to_le64(src->extent.offset);
 296                dst->extent.length =
 297                        cpu_to_le64(src->extent.length);
 298                dst->extent.truncate_size =
 299                        cpu_to_le64(src->extent.truncate_size);
 300                dst->extent.truncate_seq =
 301                        cpu_to_le32(src->extent.truncate_seq);
 302                break;
 303
 304        case CEPH_OSD_OP_GETXATTR:
 305        case CEPH_OSD_OP_SETXATTR:
 306        case CEPH_OSD_OP_CMPXATTR:
 307                BUG_ON(!req->r_trail);
 308
 309                dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
 310                dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
 311                dst->xattr.cmp_op = src->xattr.cmp_op;
 312                dst->xattr.cmp_mode = src->xattr.cmp_mode;
 313                ceph_pagelist_append(req->r_trail, src->xattr.name,
 314                                     src->xattr.name_len);
 315                ceph_pagelist_append(req->r_trail, src->xattr.val,
 316                                     src->xattr.value_len);
 317                break;
 318        case CEPH_OSD_OP_CALL:
 319                BUG_ON(!req->r_trail);
 320
 321                dst->cls.class_len = src->cls.class_len;
 322                dst->cls.method_len = src->cls.method_len;
 323                dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 324
 325                ceph_pagelist_append(req->r_trail, src->cls.class_name,
 326                                     src->cls.class_len);
 327                ceph_pagelist_append(req->r_trail, src->cls.method_name,
 328                                     src->cls.method_len);
 329                ceph_pagelist_append(req->r_trail, src->cls.indata,
 330                                     src->cls.indata_len);
 331                break;
 332        case CEPH_OSD_OP_ROLLBACK:
 333                dst->snap.snapid = cpu_to_le64(src->snap.snapid);
 334                break;
 335        case CEPH_OSD_OP_STARTSYNC:
 336                break;
 337        case CEPH_OSD_OP_NOTIFY:
 338                {
 339                        __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
 340                        __le32 timeout = cpu_to_le32(src->watch.timeout);
 341
 342                        BUG_ON(!req->r_trail);
 343
 344                        ceph_pagelist_append(req->r_trail,
 345                                                &prot_ver, sizeof(prot_ver));
 346                        ceph_pagelist_append(req->r_trail,
 347                                                &timeout, sizeof(timeout));
 348                }
 349        case CEPH_OSD_OP_NOTIFY_ACK:
 350        case CEPH_OSD_OP_WATCH:
 351                dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 352                dst->watch.ver = cpu_to_le64(src->watch.ver);
 353                dst->watch.flag = src->watch.flag;
 354                break;
 355        default:
 356                pr_err("unrecognized osd opcode %d\n", dst->op);
 357                WARN_ON(1);
 358                break;
 359        }
 360        dst->payload_len = cpu_to_le32(src->payload_len);
 361}
 362
 363/*
 364 * build new request AND message
 365 *
 366 */
 367void ceph_osdc_build_request(struct ceph_osd_request *req,
 368                             u64 off, u64 *plen,
 369                             struct ceph_osd_req_op *src_ops,
 370                             struct ceph_snap_context *snapc,
 371                             struct timespec *mtime,
 372                             const char *oid,
 373                             int oid_len)
 374{
 375        struct ceph_msg *msg = req->r_request;
 376        struct ceph_osd_request_head *head;
 377        struct ceph_osd_req_op *src_op;
 378        struct ceph_osd_op *op;
 379        void *p;
 380        int num_op = get_num_ops(src_ops, NULL);
 381        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
 382        int flags = req->r_flags;
 383        u64 data_len = 0;
 384        int i;
 385
 386        head = msg->front.iov_base;
 387        op = (void *)(head + 1);
 388        p = (void *)(op + num_op);
 389
 390        req->r_snapc = ceph_get_snap_context(snapc);
 391
 392        head->client_inc = cpu_to_le32(1); /* always, for now. */
 393        head->flags = cpu_to_le32(flags);
 394        if (flags & CEPH_OSD_FLAG_WRITE)
 395                ceph_encode_timespec(&head->mtime, mtime);
 396        head->num_ops = cpu_to_le16(num_op);
 397
 398
 399        /* fill in oid */
 400        head->object_len = cpu_to_le32(oid_len);
 401        memcpy(p, oid, oid_len);
 402        p += oid_len;
 403
 404        src_op = src_ops;
 405        while (src_op->op) {
 406                osd_req_encode_op(req, op, src_op);
 407                src_op++;
 408                op++;
 409        }
 410
 411        if (req->r_trail)
 412                data_len += req->r_trail->length;
 413
 414        if (snapc) {
 415                head->snap_seq = cpu_to_le64(snapc->seq);
 416                head->num_snaps = cpu_to_le32(snapc->num_snaps);
 417                for (i = 0; i < snapc->num_snaps; i++) {
 418                        put_unaligned_le64(snapc->snaps[i], p);
 419                        p += sizeof(u64);
 420                }
 421        }
 422
 423        if (flags & CEPH_OSD_FLAG_WRITE) {
 424                req->r_request->hdr.data_off = cpu_to_le16(off);
 425                req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
 426        } else if (data_len) {
 427                req->r_request->hdr.data_off = 0;
 428                req->r_request->hdr.data_len = cpu_to_le32(data_len);
 429        }
 430
 431        req->r_request->page_alignment = req->r_page_alignment;
 432
 433        BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
 434        msg_size = p - msg->front.iov_base;
 435        msg->front.iov_len = msg_size;
 436        msg->hdr.front_len = cpu_to_le32(msg_size);
 437        return;
 438}
 439EXPORT_SYMBOL(ceph_osdc_build_request);
 440
 441/*
 442 * build new request AND message, calculate layout, and adjust file
 443 * extent as needed.
 444 *
 445 * if the file was recently truncated, we include information about its
 446 * old and new size so that the object can be updated appropriately.  (we
 447 * avoid synchronously deleting truncated objects because it's slow.)
 448 *
 449 * if @do_sync, include a 'startsync' command so that the osd will flush
 450 * data quickly.
 451 */
 452struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 453                                               struct ceph_file_layout *layout,
 454                                               struct ceph_vino vino,
 455                                               u64 off, u64 *plen,
 456                                               int opcode, int flags,
 457                                               struct ceph_snap_context *snapc,
 458                                               int do_sync,
 459                                               u32 truncate_seq,
 460                                               u64 truncate_size,
 461                                               struct timespec *mtime,
 462                                               bool use_mempool, int num_reply,
 463                                               int page_align)
 464{
 465        struct ceph_osd_req_op ops[3];
 466        struct ceph_osd_request *req;
 467        int r;
 468
 469        ops[0].op = opcode;
 470        ops[0].extent.truncate_seq = truncate_seq;
 471        ops[0].extent.truncate_size = truncate_size;
 472        ops[0].payload_len = 0;
 473
 474        if (do_sync) {
 475                ops[1].op = CEPH_OSD_OP_STARTSYNC;
 476                ops[1].payload_len = 0;
 477                ops[2].op = 0;
 478        } else
 479                ops[1].op = 0;
 480
 481        req = ceph_osdc_alloc_request(osdc, flags,
 482                                         snapc, ops,
 483                                         use_mempool,
 484                                         GFP_NOFS, NULL, NULL);
 485        if (!req)
 486                return ERR_PTR(-ENOMEM);
 487
 488        /* calculate max write size */
 489        r = calc_layout(osdc, vino, layout, off, plen, req, ops);
 490        if (r < 0)
 491                return ERR_PTR(r);
 492        req->r_file_layout = *layout;  /* keep a copy */
 493
 494        /* in case it differs from natural (file) alignment that
 495           calc_layout filled in for us */
 496        req->r_num_pages = calc_pages_for(page_align, *plen);
 497        req->r_page_alignment = page_align;
 498
 499        ceph_osdc_build_request(req, off, plen, ops,
 500                                snapc,
 501                                mtime,
 502                                req->r_oid, req->r_oid_len);
 503
 504        return req;
 505}
 506EXPORT_SYMBOL(ceph_osdc_new_request);
 507
 508/*
 509 * We keep osd requests in an rbtree, sorted by ->r_tid.
 510 */
 511static void __insert_request(struct ceph_osd_client *osdc,
 512                             struct ceph_osd_request *new)
 513{
 514        struct rb_node **p = &osdc->requests.rb_node;
 515        struct rb_node *parent = NULL;
 516        struct ceph_osd_request *req = NULL;
 517
 518        while (*p) {
 519                parent = *p;
 520                req = rb_entry(parent, struct ceph_osd_request, r_node);
 521                if (new->r_tid < req->r_tid)
 522                        p = &(*p)->rb_left;
 523                else if (new->r_tid > req->r_tid)
 524                        p = &(*p)->rb_right;
 525                else
 526                        BUG();
 527        }
 528
 529        rb_link_node(&new->r_node, parent, p);
 530        rb_insert_color(&new->r_node, &osdc->requests);
 531}
 532
 533static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
 534                                                 u64 tid)
 535{
 536        struct ceph_osd_request *req;
 537        struct rb_node *n = osdc->requests.rb_node;
 538
 539        while (n) {
 540                req = rb_entry(n, struct ceph_osd_request, r_node);
 541                if (tid < req->r_tid)
 542                        n = n->rb_left;
 543                else if (tid > req->r_tid)
 544                        n = n->rb_right;
 545                else
 546                        return req;
 547        }
 548        return NULL;
 549}
 550
 551static struct ceph_osd_request *
 552__lookup_request_ge(struct ceph_osd_client *osdc,
 553                    u64 tid)
 554{
 555        struct ceph_osd_request *req;
 556        struct rb_node *n = osdc->requests.rb_node;
 557
 558        while (n) {
 559                req = rb_entry(n, struct ceph_osd_request, r_node);
 560                if (tid < req->r_tid) {
 561                        if (!n->rb_left)
 562                                return req;
 563                        n = n->rb_left;
 564                } else if (tid > req->r_tid) {
 565                        n = n->rb_right;
 566                } else {
 567                        return req;
 568                }
 569        }
 570        return NULL;
 571}
 572
 573/*
 574 * Resubmit requests pending on the given osd.
 575 */
 576static void __kick_osd_requests(struct ceph_osd_client *osdc,
 577                                struct ceph_osd *osd)
 578{
 579        struct ceph_osd_request *req, *nreq;
 580        int err;
 581
 582        dout("__kick_osd_requests osd%d\n", osd->o_osd);
 583        err = __reset_osd(osdc, osd);
 584        if (err)
 585                return;
 586
 587        list_for_each_entry(req, &osd->o_requests, r_osd_item) {
 588                list_move(&req->r_req_lru_item, &osdc->req_unsent);
 589                dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
 590                     osd->o_osd);
 591                if (!req->r_linger)
 592                        req->r_flags |= CEPH_OSD_FLAG_RETRY;
 593        }
 594
 595        list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
 596                                 r_linger_osd) {
 597                /*
 598                 * reregister request prior to unregistering linger so
 599                 * that r_osd is preserved.
 600                 */
 601                BUG_ON(!list_empty(&req->r_req_lru_item));
 602                __register_request(osdc, req);
 603                list_add(&req->r_req_lru_item, &osdc->req_unsent);
 604                list_add(&req->r_osd_item, &req->r_osd->o_requests);
 605                __unregister_linger_request(osdc, req);
 606                dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
 607                     osd->o_osd);
 608        }
 609}
 610
 611/*
 612 * If the osd connection drops, we need to resubmit all requests.
 613 */
 614static void osd_reset(struct ceph_connection *con)
 615{
 616        struct ceph_osd *osd = con->private;
 617        struct ceph_osd_client *osdc;
 618
 619        if (!osd)
 620                return;
 621        dout("osd_reset osd%d\n", osd->o_osd);
 622        osdc = osd->o_osdc;
 623        down_read(&osdc->map_sem);
 624        mutex_lock(&osdc->request_mutex);
 625        __kick_osd_requests(osdc, osd);
 626        mutex_unlock(&osdc->request_mutex);
 627        send_queued(osdc);
 628        up_read(&osdc->map_sem);
 629}
 630
 631/*
 632 * Track open sessions with osds.
 633 */
 634static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 635{
 636        struct ceph_osd *osd;
 637
 638        osd = kzalloc(sizeof(*osd), GFP_NOFS);
 639        if (!osd)
 640                return NULL;
 641
 642        atomic_set(&osd->o_ref, 1);
 643        osd->o_osdc = osdc;
 644        osd->o_osd = onum;
 645        RB_CLEAR_NODE(&osd->o_node);
 646        INIT_LIST_HEAD(&osd->o_requests);
 647        INIT_LIST_HEAD(&osd->o_linger_requests);
 648        INIT_LIST_HEAD(&osd->o_osd_lru);
 649        osd->o_incarnation = 1;
 650
 651        ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
 652
 653        INIT_LIST_HEAD(&osd->o_keepalive_item);
 654        return osd;
 655}
 656
 657static struct ceph_osd *get_osd(struct ceph_osd *osd)
 658{
 659        if (atomic_inc_not_zero(&osd->o_ref)) {
 660                dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
 661                     atomic_read(&osd->o_ref));
 662                return osd;
 663        } else {
 664                dout("get_osd %p FAIL\n", osd);
 665                return NULL;
 666        }
 667}
 668
 669static void put_osd(struct ceph_osd *osd)
 670{
 671        dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
 672             atomic_read(&osd->o_ref) - 1);
 673        if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
 674                struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
 675
 676                if (ac->ops && ac->ops->destroy_authorizer)
 677                        ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
 678                kfree(osd);
 679        }
 680}
 681
 682/*
 683 * remove an osd from our map
 684 */
 685static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 686{
 687        dout("__remove_osd %p\n", osd);
 688        BUG_ON(!list_empty(&osd->o_requests));
 689        rb_erase(&osd->o_node, &osdc->osds);
 690        list_del_init(&osd->o_osd_lru);
 691        ceph_con_close(&osd->o_con);
 692        put_osd(osd);
 693}
 694
 695static void remove_all_osds(struct ceph_osd_client *osdc)
 696{
 697        dout("%s %p\n", __func__, osdc);
 698        mutex_lock(&osdc->request_mutex);
 699        while (!RB_EMPTY_ROOT(&osdc->osds)) {
 700                struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
 701                                                struct ceph_osd, o_node);
 702                __remove_osd(osdc, osd);
 703        }
 704        mutex_unlock(&osdc->request_mutex);
 705}
 706
 707static void __move_osd_to_lru(struct ceph_osd_client *osdc,
 708                              struct ceph_osd *osd)
 709{
 710        dout("__move_osd_to_lru %p\n", osd);
 711        BUG_ON(!list_empty(&osd->o_osd_lru));
 712        list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
 713        osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
 714}
 715
 716static void __remove_osd_from_lru(struct ceph_osd *osd)
 717{
 718        dout("__remove_osd_from_lru %p\n", osd);
 719        if (!list_empty(&osd->o_osd_lru))
 720                list_del_init(&osd->o_osd_lru);
 721}
 722
 723static void remove_old_osds(struct ceph_osd_client *osdc)
 724{
 725        struct ceph_osd *osd, *nosd;
 726
 727        dout("__remove_old_osds %p\n", osdc);
 728        mutex_lock(&osdc->request_mutex);
 729        list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
 730                if (time_before(jiffies, osd->lru_ttl))
 731                        break;
 732                __remove_osd(osdc, osd);
 733        }
 734        mutex_unlock(&osdc->request_mutex);
 735}
 736
 737/*
 738 * reset osd connect
 739 */
 740static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 741{
 742        struct ceph_osd_request *req;
 743        int ret = 0;
 744
 745        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
 746        if (list_empty(&osd->o_requests) &&
 747            list_empty(&osd->o_linger_requests)) {
 748                __remove_osd(osdc, osd);
 749                ret = -ENODEV;
 750        } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
 751                          &osd->o_con.peer_addr,
 752                          sizeof(osd->o_con.peer_addr)) == 0 &&
 753                   !ceph_con_opened(&osd->o_con)) {
 754                dout(" osd addr hasn't changed and connection never opened,"
 755                     " letting msgr retry");
 756                /* touch each r_stamp for handle_timeout()'s benfit */
 757                list_for_each_entry(req, &osd->o_requests, r_osd_item)
 758                        req->r_stamp = jiffies;
 759                ret = -EAGAIN;
 760        } else {
 761                ceph_con_close(&osd->o_con);
 762                ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
 763                              &osdc->osdmap->osd_addr[osd->o_osd]);
 764                osd->o_incarnation++;
 765        }
 766        return ret;
 767}
 768
 769static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
 770{
 771        struct rb_node **p = &osdc->osds.rb_node;
 772        struct rb_node *parent = NULL;
 773        struct ceph_osd *osd = NULL;
 774
 775        dout("__insert_osd %p osd%d\n", new, new->o_osd);
 776        while (*p) {
 777                parent = *p;
 778                osd = rb_entry(parent, struct ceph_osd, o_node);
 779                if (new->o_osd < osd->o_osd)
 780                        p = &(*p)->rb_left;
 781                else if (new->o_osd > osd->o_osd)
 782                        p = &(*p)->rb_right;
 783                else
 784                        BUG();
 785        }
 786
 787        rb_link_node(&new->o_node, parent, p);
 788        rb_insert_color(&new->o_node, &osdc->osds);
 789}
 790
 791static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
 792{
 793        struct ceph_osd *osd;
 794        struct rb_node *n = osdc->osds.rb_node;
 795
 796        while (n) {
 797                osd = rb_entry(n, struct ceph_osd, o_node);
 798                if (o < osd->o_osd)
 799                        n = n->rb_left;
 800                else if (o > osd->o_osd)
 801                        n = n->rb_right;
 802                else
 803                        return osd;
 804        }
 805        return NULL;
 806}
 807
 808static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
 809{
 810        schedule_delayed_work(&osdc->timeout_work,
 811                        osdc->client->options->osd_keepalive_timeout * HZ);
 812}
 813
 814static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
 815{
 816        cancel_delayed_work(&osdc->timeout_work);
 817}
 818
 819/*
 820 * Register request, assign tid.  If this is the first request, set up
 821 * the timeout event.
 822 */
 823static void __register_request(struct ceph_osd_client *osdc,
 824                               struct ceph_osd_request *req)
 825{
 826        req->r_tid = ++osdc->last_tid;
 827        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
 828        dout("__register_request %p tid %lld\n", req, req->r_tid);
 829        __insert_request(osdc, req);
 830        ceph_osdc_get_request(req);
 831        osdc->num_requests++;
 832        if (osdc->num_requests == 1) {
 833                dout(" first request, scheduling timeout\n");
 834                __schedule_osd_timeout(osdc);
 835        }
 836}
 837
 838static void register_request(struct ceph_osd_client *osdc,
 839                             struct ceph_osd_request *req)
 840{
 841        mutex_lock(&osdc->request_mutex);
 842        __register_request(osdc, req);
 843        mutex_unlock(&osdc->request_mutex);
 844}
 845
 846/*
 847 * called under osdc->request_mutex
 848 */
 849static void __unregister_request(struct ceph_osd_client *osdc,
 850                                 struct ceph_osd_request *req)
 851{
 852        if (RB_EMPTY_NODE(&req->r_node)) {
 853                dout("__unregister_request %p tid %lld not registered\n",
 854                        req, req->r_tid);
 855                return;
 856        }
 857
 858        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 859        rb_erase(&req->r_node, &osdc->requests);
 860        osdc->num_requests--;
 861
 862        if (req->r_osd) {
 863                /* make sure the original request isn't in flight. */
 864                ceph_msg_revoke(req->r_request);
 865
 866                list_del_init(&req->r_osd_item);
 867                if (list_empty(&req->r_osd->o_requests) &&
 868                    list_empty(&req->r_osd->o_linger_requests)) {
 869                        dout("moving osd to %p lru\n", req->r_osd);
 870                        __move_osd_to_lru(osdc, req->r_osd);
 871                }
 872                if (list_empty(&req->r_linger_item))
 873                        req->r_osd = NULL;
 874        }
 875
 876        list_del_init(&req->r_req_lru_item);
 877        ceph_osdc_put_request(req);
 878
 879        if (osdc->num_requests == 0) {
 880                dout(" no requests, canceling timeout\n");
 881                __cancel_osd_timeout(osdc);
 882        }
 883}
 884
 885/*
 886 * Cancel a previously queued request message
 887 */
 888static void __cancel_request(struct ceph_osd_request *req)
 889{
 890        if (req->r_sent && req->r_osd) {
 891                ceph_msg_revoke(req->r_request);
 892                req->r_sent = 0;
 893        }
 894}
 895
 896static void __register_linger_request(struct ceph_osd_client *osdc,
 897                                    struct ceph_osd_request *req)
 898{
 899        dout("__register_linger_request %p\n", req);
 900        list_add_tail(&req->r_linger_item, &osdc->req_linger);
 901        if (req->r_osd)
 902                list_add_tail(&req->r_linger_osd,
 903                              &req->r_osd->o_linger_requests);
 904}
 905
 906static void __unregister_linger_request(struct ceph_osd_client *osdc,
 907                                        struct ceph_osd_request *req)
 908{
 909        dout("__unregister_linger_request %p\n", req);
 910        list_del_init(&req->r_linger_item);
 911        if (req->r_osd) {
 912                list_del_init(&req->r_linger_osd);
 913
 914                if (list_empty(&req->r_osd->o_requests) &&
 915                    list_empty(&req->r_osd->o_linger_requests)) {
 916                        dout("moving osd to %p lru\n", req->r_osd);
 917                        __move_osd_to_lru(osdc, req->r_osd);
 918                }
 919                if (list_empty(&req->r_osd_item))
 920                        req->r_osd = NULL;
 921        }
 922}
 923
 924void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
 925                                         struct ceph_osd_request *req)
 926{
 927        mutex_lock(&osdc->request_mutex);
 928        if (req->r_linger) {
 929                __unregister_linger_request(osdc, req);
 930                ceph_osdc_put_request(req);
 931        }
 932        mutex_unlock(&osdc->request_mutex);
 933}
 934EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
 935
 936void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 937                                  struct ceph_osd_request *req)
 938{
 939        if (!req->r_linger) {
 940                dout("set_request_linger %p\n", req);
 941                req->r_linger = 1;
 942                /*
 943                 * caller is now responsible for calling
 944                 * unregister_linger_request
 945                 */
 946                ceph_osdc_get_request(req);
 947        }
 948}
 949EXPORT_SYMBOL(ceph_osdc_set_request_linger);
 950
 951/*
 952 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
 953 * (as needed), and set the request r_osd appropriately.  If there is
 954 * no up osd, set r_osd to NULL.  Move the request to the appropriate list
 955 * (unsent, homeless) or leave on in-flight lru.
 956 *
 957 * Return 0 if unchanged, 1 if changed, or negative on error.
 958 *
 959 * Caller should hold map_sem for read and request_mutex.
 960 */
 961static int __map_request(struct ceph_osd_client *osdc,
 962                         struct ceph_osd_request *req, int force_resend)
 963{
 964        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
 965        struct ceph_pg pgid;
 966        int acting[CEPH_PG_MAX_SIZE];
 967        int o = -1, num = 0;
 968        int err;
 969
 970        dout("map_request %p tid %lld\n", req, req->r_tid);
 971        err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
 972                                      &req->r_file_layout, osdc->osdmap);
 973        if (err) {
 974                list_move(&req->r_req_lru_item, &osdc->req_notarget);
 975                return err;
 976        }
 977        pgid = reqhead->layout.ol_pgid;
 978        req->r_pgid = pgid;
 979
 980        err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
 981        if (err > 0) {
 982                o = acting[0];
 983                num = err;
 984        }
 985
 986        if ((!force_resend &&
 987             req->r_osd && req->r_osd->o_osd == o &&
 988             req->r_sent >= req->r_osd->o_incarnation &&
 989             req->r_num_pg_osds == num &&
 990             memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
 991            (req->r_osd == NULL && o == -1))
 992                return 0;  /* no change */
 993
 994        dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
 995             req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
 996             req->r_osd ? req->r_osd->o_osd : -1);
 997
 998        /* record full pg acting set */
 999        memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
1000        req->r_num_pg_osds = num;
1001
1002        if (req->r_osd) {
1003                __cancel_request(req);
1004                list_del_init(&req->r_osd_item);
1005                req->r_osd = NULL;
1006        }
1007
1008        req->r_osd = __lookup_osd(osdc, o);
1009        if (!req->r_osd && o >= 0) {
1010                err = -ENOMEM;
1011                req->r_osd = create_osd(osdc, o);
1012                if (!req->r_osd) {
1013                        list_move(&req->r_req_lru_item, &osdc->req_notarget);
1014                        goto out;
1015                }
1016
1017                dout("map_request osd %p is osd%d\n", req->r_osd, o);
1018                __insert_osd(osdc, req->r_osd);
1019
1020                ceph_con_open(&req->r_osd->o_con,
1021                              CEPH_ENTITY_TYPE_OSD, o,
1022                              &osdc->osdmap->osd_addr[o]);
1023        }
1024
1025        if (req->r_osd) {
1026                __remove_osd_from_lru(req->r_osd);
1027                list_add(&req->r_osd_item, &req->r_osd->o_requests);
1028                list_move(&req->r_req_lru_item, &osdc->req_unsent);
1029        } else {
1030                list_move(&req->r_req_lru_item, &osdc->req_notarget);
1031        }
1032        err = 1;   /* osd or pg changed */
1033
1034out:
1035        return err;
1036}
1037
1038/*
1039 * caller should hold map_sem (for read) and request_mutex
1040 */
1041static void __send_request(struct ceph_osd_client *osdc,
1042                           struct ceph_osd_request *req)
1043{
1044        struct ceph_osd_request_head *reqhead;
1045
1046        dout("send_request %p tid %llu to osd%d flags %d\n",
1047             req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1048
1049        reqhead = req->r_request->front.iov_base;
1050        reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1051        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
1052        reqhead->reassert_version = req->r_reassert_version;
1053
1054        req->r_stamp = jiffies;
1055        list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1056
1057        ceph_msg_get(req->r_request); /* send consumes a ref */
1058        ceph_con_send(&req->r_osd->o_con, req->r_request);
1059        req->r_sent = req->r_osd->o_incarnation;
1060}
1061
1062/*
1063 * Send any requests in the queue (req_unsent).
1064 */
1065static void send_queued(struct ceph_osd_client *osdc)
1066{
1067        struct ceph_osd_request *req, *tmp;
1068
1069        dout("send_queued\n");
1070        mutex_lock(&osdc->request_mutex);
1071        list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1072                __send_request(osdc, req);
1073        }
1074        mutex_unlock(&osdc->request_mutex);
1075}
1076
1077/*
1078 * Timeout callback, called every N seconds when 1 or more osd
1079 * requests has been active for more than N seconds.  When this
1080 * happens, we ping all OSDs with requests who have timed out to
1081 * ensure any communications channel reset is detected.  Reset the
1082 * request timeouts another N seconds in the future as we go.
1083 * Reschedule the timeout event another N seconds in future (unless
1084 * there are no open requests).
1085 */
1086static void handle_timeout(struct work_struct *work)
1087{
1088        struct ceph_osd_client *osdc =
1089                container_of(work, struct ceph_osd_client, timeout_work.work);
1090        struct ceph_osd_request *req;
1091        struct ceph_osd *osd;
1092        unsigned long keepalive =
1093                osdc->client->options->osd_keepalive_timeout * HZ;
1094        struct list_head slow_osds;
1095        dout("timeout\n");
1096        down_read(&osdc->map_sem);
1097
1098        ceph_monc_request_next_osdmap(&osdc->client->monc);
1099
1100        mutex_lock(&osdc->request_mutex);
1101
1102        /*
1103         * ping osds that are a bit slow.  this ensures that if there
1104         * is a break in the TCP connection we will notice, and reopen
1105         * a connection with that osd (from the fault callback).
1106         */
1107        INIT_LIST_HEAD(&slow_osds);
1108        list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1109                if (time_before(jiffies, req->r_stamp + keepalive))
1110                        break;
1111
1112                osd = req->r_osd;
1113                BUG_ON(!osd);
1114                dout(" tid %llu is slow, will send keepalive on osd%d\n",
1115                     req->r_tid, osd->o_osd);
1116                list_move_tail(&osd->o_keepalive_item, &slow_osds);
1117        }
1118        while (!list_empty(&slow_osds)) {
1119                osd = list_entry(slow_osds.next, struct ceph_osd,
1120                                 o_keepalive_item);
1121                list_del_init(&osd->o_keepalive_item);
1122                ceph_con_keepalive(&osd->o_con);
1123        }
1124
1125        __schedule_osd_timeout(osdc);
1126        mutex_unlock(&osdc->request_mutex);
1127        send_queued(osdc);
1128        up_read(&osdc->map_sem);
1129}
1130
1131static void handle_osds_timeout(struct work_struct *work)
1132{
1133        struct ceph_osd_client *osdc =
1134                container_of(work, struct ceph_osd_client,
1135                             osds_timeout_work.work);
1136        unsigned long delay =
1137                osdc->client->options->osd_idle_ttl * HZ >> 2;
1138
1139        dout("osds timeout\n");
1140        down_read(&osdc->map_sem);
1141        remove_old_osds(osdc);
1142        up_read(&osdc->map_sem);
1143
1144        schedule_delayed_work(&osdc->osds_timeout_work,
1145                              round_jiffies_relative(delay));
1146}
1147
1148static void complete_request(struct ceph_osd_request *req)
1149{
1150        if (req->r_safe_callback)
1151                req->r_safe_callback(req, NULL);
1152        complete_all(&req->r_safe_completion);  /* fsync waiter */
1153}
1154
1155/*
1156 * handle osd op reply.  either call the callback if it is specified,
1157 * or do the completion to wake up the waiting thread.
1158 */
1159static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1160                         struct ceph_connection *con)
1161{
1162        struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1163        struct ceph_osd_request *req;
1164        u64 tid;
1165        int numops, object_len, flags;
1166        s32 result;
1167
1168        tid = le64_to_cpu(msg->hdr.tid);
1169        if (msg->front.iov_len < sizeof(*rhead))
1170                goto bad;
1171        numops = le32_to_cpu(rhead->num_ops);
1172        object_len = le32_to_cpu(rhead->object_len);
1173        result = le32_to_cpu(rhead->result);
1174        if (msg->front.iov_len != sizeof(*rhead) + object_len +
1175            numops * sizeof(struct ceph_osd_op))
1176                goto bad;
1177        dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1178        /* lookup */
1179        mutex_lock(&osdc->request_mutex);
1180        req = __lookup_request(osdc, tid);
1181        if (req == NULL) {
1182                dout("handle_reply tid %llu dne\n", tid);
1183                mutex_unlock(&osdc->request_mutex);
1184                return;
1185        }
1186        ceph_osdc_get_request(req);
1187        flags = le32_to_cpu(rhead->flags);
1188
1189        /*
1190         * if this connection filled our message, drop our reference now, to
1191         * avoid a (safe but slower) revoke later.
1192         */
1193        if (req->r_con_filling_msg == con && req->r_reply == msg) {
1194                dout(" dropping con_filling_msg ref %p\n", con);
1195                req->r_con_filling_msg = NULL;
1196                con->ops->put(con);
1197        }
1198
1199        if (!req->r_got_reply) {
1200                unsigned int bytes;
1201
1202                req->r_result = le32_to_cpu(rhead->result);
1203                bytes = le32_to_cpu(msg->hdr.data_len);
1204                dout("handle_reply result %d bytes %d\n", req->r_result,
1205                     bytes);
1206                if (req->r_result == 0)
1207                        req->r_result = bytes;
1208
1209                /* in case this is a write and we need to replay, */
1210                req->r_reassert_version = rhead->reassert_version;
1211
1212                req->r_got_reply = 1;
1213        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1214                dout("handle_reply tid %llu dup ack\n", tid);
1215                mutex_unlock(&osdc->request_mutex);
1216                goto done;
1217        }
1218
1219        dout("handle_reply tid %llu flags %d\n", tid, flags);
1220
1221        if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1222                __register_linger_request(osdc, req);
1223
1224        /* either this is a read, or we got the safe response */
1225        if (result < 0 ||
1226            (flags & CEPH_OSD_FLAG_ONDISK) ||
1227            ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1228                __unregister_request(osdc, req);
1229
1230        mutex_unlock(&osdc->request_mutex);
1231
1232        if (req->r_callback)
1233                req->r_callback(req, msg);
1234        else
1235                complete_all(&req->r_completion);
1236
1237        if (flags & CEPH_OSD_FLAG_ONDISK)
1238                complete_request(req);
1239
1240done:
1241        dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1242        ceph_osdc_put_request(req);
1243        return;
1244
1245bad:
1246        pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1247               (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1248               (int)sizeof(*rhead));
1249        ceph_msg_dump(msg);
1250}
1251
1252static void reset_changed_osds(struct ceph_osd_client *osdc)
1253{
1254        struct rb_node *p, *n;
1255
1256        for (p = rb_first(&osdc->osds); p; p = n) {
1257                struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1258
1259                n = rb_next(p);
1260                if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1261                    memcmp(&osd->o_con.peer_addr,
1262                           ceph_osd_addr(osdc->osdmap,
1263                                         osd->o_osd),
1264                           sizeof(struct ceph_entity_addr)) != 0)
1265                        __reset_osd(osdc, osd);
1266        }
1267}
1268
1269/*
1270 * Requeue requests whose mapping to an OSD has changed.  If requests map to
1271 * no osd, request a new map.
1272 *
1273 * Caller should hold map_sem for read.
1274 */
1275static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1276{
1277        struct ceph_osd_request *req, *nreq;
1278        struct rb_node *p;
1279        int needmap = 0;
1280        int err;
1281
1282        dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1283        mutex_lock(&osdc->request_mutex);
1284        for (p = rb_first(&osdc->requests); p; ) {
1285                req = rb_entry(p, struct ceph_osd_request, r_node);
1286                p = rb_next(p);
1287
1288                /*
1289                 * For linger requests that have not yet been
1290                 * registered, move them to the linger list; they'll
1291                 * be sent to the osd in the loop below.  Unregister
1292                 * the request before re-registering it as a linger
1293                 * request to ensure the __map_request() below
1294                 * will decide it needs to be sent.
1295                 */
1296                if (req->r_linger && list_empty(&req->r_linger_item)) {
1297                        dout("%p tid %llu restart on osd%d\n",
1298                             req, req->r_tid,
1299                             req->r_osd ? req->r_osd->o_osd : -1);
1300                        __unregister_request(osdc, req);
1301                        __register_linger_request(osdc, req);
1302                        continue;
1303                }
1304
1305                err = __map_request(osdc, req, force_resend);
1306                if (err < 0)
1307                        continue;  /* error */
1308                if (req->r_osd == NULL) {
1309                        dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1310                        needmap++;  /* request a newer map */
1311                } else if (err > 0) {
1312                        if (!req->r_linger) {
1313                                dout("%p tid %llu requeued on osd%d\n", req,
1314                                     req->r_tid,
1315                                     req->r_osd ? req->r_osd->o_osd : -1);
1316                                req->r_flags |= CEPH_OSD_FLAG_RETRY;
1317                        }
1318                }
1319        }
1320
1321        list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1322                                 r_linger_item) {
1323                dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1324
1325                err = __map_request(osdc, req, force_resend);
1326                dout("__map_request returned %d\n", err);
1327                if (err == 0)
1328                        continue;  /* no change and no osd was specified */
1329                if (err < 0)
1330                        continue;  /* hrm! */
1331                if (req->r_osd == NULL) {
1332                        dout("tid %llu maps to no valid osd\n", req->r_tid);
1333                        needmap++;  /* request a newer map */
1334                        continue;
1335                }
1336
1337                dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1338                     req->r_osd ? req->r_osd->o_osd : -1);
1339                __register_request(osdc, req);
1340                __unregister_linger_request(osdc, req);
1341        }
1342        mutex_unlock(&osdc->request_mutex);
1343
1344        if (needmap) {
1345                dout("%d requests for down osds, need new map\n", needmap);
1346                ceph_monc_request_next_osdmap(&osdc->client->monc);
1347        }
1348        reset_changed_osds(osdc);
1349}
1350
1351
1352/*
1353 * Process updated osd map.
1354 *
1355 * The message contains any number of incremental and full maps, normally
1356 * indicating some sort of topology change in the cluster.  Kick requests
1357 * off to different OSDs as needed.
1358 */
1359void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1360{
1361        void *p, *end, *next;
1362        u32 nr_maps, maplen;
1363        u32 epoch;
1364        struct ceph_osdmap *newmap = NULL, *oldmap;
1365        int err;
1366        struct ceph_fsid fsid;
1367
1368        dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1369        p = msg->front.iov_base;
1370        end = p + msg->front.iov_len;
1371
1372        /* verify fsid */
1373        ceph_decode_need(&p, end, sizeof(fsid), bad);
1374        ceph_decode_copy(&p, &fsid, sizeof(fsid));
1375        if (ceph_check_fsid(osdc->client, &fsid) < 0)
1376                return;
1377
1378        down_write(&osdc->map_sem);
1379
1380        /* incremental maps */
1381        ceph_decode_32_safe(&p, end, nr_maps, bad);
1382        dout(" %d inc maps\n", nr_maps);
1383        while (nr_maps > 0) {
1384                ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1385                epoch = ceph_decode_32(&p);
1386                maplen = ceph_decode_32(&p);
1387                ceph_decode_need(&p, end, maplen, bad);
1388                next = p + maplen;
1389                if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1390                        dout("applying incremental map %u len %d\n",
1391                             epoch, maplen);
1392                        newmap = osdmap_apply_incremental(&p, next,
1393                                                          osdc->osdmap,
1394                                                          &osdc->client->msgr);
1395                        if (IS_ERR(newmap)) {
1396                                err = PTR_ERR(newmap);
1397                                goto bad;
1398                        }
1399                        BUG_ON(!newmap);
1400                        if (newmap != osdc->osdmap) {
1401                                ceph_osdmap_destroy(osdc->osdmap);
1402                                osdc->osdmap = newmap;
1403                        }
1404                        kick_requests(osdc, 0);
1405                } else {
1406                        dout("ignoring incremental map %u len %d\n",
1407                             epoch, maplen);
1408                }
1409                p = next;
1410                nr_maps--;
1411        }
1412        if (newmap)
1413                goto done;
1414
1415        /* full maps */
1416        ceph_decode_32_safe(&p, end, nr_maps, bad);
1417        dout(" %d full maps\n", nr_maps);
1418        while (nr_maps) {
1419                ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1420                epoch = ceph_decode_32(&p);
1421                maplen = ceph_decode_32(&p);
1422                ceph_decode_need(&p, end, maplen, bad);
1423                if (nr_maps > 1) {
1424                        dout("skipping non-latest full map %u len %d\n",
1425                             epoch, maplen);
1426                } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1427                        dout("skipping full map %u len %d, "
1428                             "older than our %u\n", epoch, maplen,
1429                             osdc->osdmap->epoch);
1430                } else {
1431                        int skipped_map = 0;
1432
1433                        dout("taking full map %u len %d\n", epoch, maplen);
1434                        newmap = osdmap_decode(&p, p+maplen);
1435                        if (IS_ERR(newmap)) {
1436                                err = PTR_ERR(newmap);
1437                                goto bad;
1438                        }
1439                        BUG_ON(!newmap);
1440                        oldmap = osdc->osdmap;
1441                        osdc->osdmap = newmap;
1442                        if (oldmap) {
1443                                if (oldmap->epoch + 1 < newmap->epoch)
1444                                        skipped_map = 1;
1445                                ceph_osdmap_destroy(oldmap);
1446                        }
1447                        kick_requests(osdc, skipped_map);
1448                }
1449                p += maplen;
1450                nr_maps--;
1451        }
1452
1453done:
1454        downgrade_write(&osdc->map_sem);
1455        ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1456
1457        /*
1458         * subscribe to subsequent osdmap updates if full to ensure
1459         * we find out when we are no longer full and stop returning
1460         * ENOSPC.
1461         */
1462        if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1463                ceph_monc_request_next_osdmap(&osdc->client->monc);
1464
1465        send_queued(osdc);
1466        up_read(&osdc->map_sem);
1467        wake_up_all(&osdc->client->auth_wq);
1468        return;
1469
1470bad:
1471        pr_err("osdc handle_map corrupt msg\n");
1472        ceph_msg_dump(msg);
1473        up_write(&osdc->map_sem);
1474        return;
1475}
1476
1477/*
1478 * watch/notify callback event infrastructure
1479 *
1480 * These callbacks are used both for watch and notify operations.
1481 */
1482static void __release_event(struct kref *kref)
1483{
1484        struct ceph_osd_event *event =
1485                container_of(kref, struct ceph_osd_event, kref);
1486
1487        dout("__release_event %p\n", event);
1488        kfree(event);
1489}
1490
1491static void get_event(struct ceph_osd_event *event)
1492{
1493        kref_get(&event->kref);
1494}
1495
1496void ceph_osdc_put_event(struct ceph_osd_event *event)
1497{
1498        kref_put(&event->kref, __release_event);
1499}
1500EXPORT_SYMBOL(ceph_osdc_put_event);
1501
1502static void __insert_event(struct ceph_osd_client *osdc,
1503                             struct ceph_osd_event *new)
1504{
1505        struct rb_node **p = &osdc->event_tree.rb_node;
1506        struct rb_node *parent = NULL;
1507        struct ceph_osd_event *event = NULL;
1508
1509        while (*p) {
1510                parent = *p;
1511                event = rb_entry(parent, struct ceph_osd_event, node);
1512                if (new->cookie < event->cookie)
1513                        p = &(*p)->rb_left;
1514                else if (new->cookie > event->cookie)
1515                        p = &(*p)->rb_right;
1516                else
1517                        BUG();
1518        }
1519
1520        rb_link_node(&new->node, parent, p);
1521        rb_insert_color(&new->node, &osdc->event_tree);
1522}
1523
1524static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1525                                                u64 cookie)
1526{
1527        struct rb_node **p = &osdc->event_tree.rb_node;
1528        struct rb_node *parent = NULL;
1529        struct ceph_osd_event *event = NULL;
1530
1531        while (*p) {
1532                parent = *p;
1533                event = rb_entry(parent, struct ceph_osd_event, node);
1534                if (cookie < event->cookie)
1535                        p = &(*p)->rb_left;
1536                else if (cookie > event->cookie)
1537                        p = &(*p)->rb_right;
1538                else
1539                        return event;
1540        }
1541        return NULL;
1542}
1543
1544static void __remove_event(struct ceph_osd_event *event)
1545{
1546        struct ceph_osd_client *osdc = event->osdc;
1547
1548        if (!RB_EMPTY_NODE(&event->node)) {
1549                dout("__remove_event removed %p\n", event);
1550                rb_erase(&event->node, &osdc->event_tree);
1551                ceph_osdc_put_event(event);
1552        } else {
1553                dout("__remove_event didn't remove %p\n", event);
1554        }
1555}
1556
1557int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1558                           void (*event_cb)(u64, u64, u8, void *),
1559                           int one_shot, void *data,
1560                           struct ceph_osd_event **pevent)
1561{
1562        struct ceph_osd_event *event;
1563
1564        event = kmalloc(sizeof(*event), GFP_NOIO);
1565        if (!event)
1566                return -ENOMEM;
1567
1568        dout("create_event %p\n", event);
1569        event->cb = event_cb;
1570        event->one_shot = one_shot;
1571        event->data = data;
1572        event->osdc = osdc;
1573        INIT_LIST_HEAD(&event->osd_node);
1574        RB_CLEAR_NODE(&event->node);
1575        kref_init(&event->kref);   /* one ref for us */
1576        kref_get(&event->kref);    /* one ref for the caller */
1577        init_completion(&event->completion);
1578
1579        spin_lock(&osdc->event_lock);
1580        event->cookie = ++osdc->event_count;
1581        __insert_event(osdc, event);
1582        spin_unlock(&osdc->event_lock);
1583
1584        *pevent = event;
1585        return 0;
1586}
1587EXPORT_SYMBOL(ceph_osdc_create_event);
1588
1589void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1590{
1591        struct ceph_osd_client *osdc = event->osdc;
1592
1593        dout("cancel_event %p\n", event);
1594        spin_lock(&osdc->event_lock);
1595        __remove_event(event);
1596        spin_unlock(&osdc->event_lock);
1597        ceph_osdc_put_event(event); /* caller's */
1598}
1599EXPORT_SYMBOL(ceph_osdc_cancel_event);
1600
1601
1602static void do_event_work(struct work_struct *work)
1603{
1604        struct ceph_osd_event_work *event_work =
1605                container_of(work, struct ceph_osd_event_work, work);
1606        struct ceph_osd_event *event = event_work->event;
1607        u64 ver = event_work->ver;
1608        u64 notify_id = event_work->notify_id;
1609        u8 opcode = event_work->opcode;
1610
1611        dout("do_event_work completing %p\n", event);
1612        event->cb(ver, notify_id, opcode, event->data);
1613        complete(&event->completion);
1614        dout("do_event_work completed %p\n", event);
1615        ceph_osdc_put_event(event);
1616        kfree(event_work);
1617}
1618
1619
1620/*
1621 * Process osd watch notifications
1622 */
1623void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1624{
1625        void *p, *end;
1626        u8 proto_ver;
1627        u64 cookie, ver, notify_id;
1628        u8 opcode;
1629        struct ceph_osd_event *event;
1630        struct ceph_osd_event_work *event_work;
1631
1632        p = msg->front.iov_base;
1633        end = p + msg->front.iov_len;
1634
1635        ceph_decode_8_safe(&p, end, proto_ver, bad);
1636        ceph_decode_8_safe(&p, end, opcode, bad);
1637        ceph_decode_64_safe(&p, end, cookie, bad);
1638        ceph_decode_64_safe(&p, end, ver, bad);
1639        ceph_decode_64_safe(&p, end, notify_id, bad);
1640
1641        spin_lock(&osdc->event_lock);
1642        event = __find_event(osdc, cookie);
1643        if (event) {
1644                get_event(event);
1645                if (event->one_shot)
1646                        __remove_event(event);
1647        }
1648        spin_unlock(&osdc->event_lock);
1649        dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1650             cookie, ver, event);
1651        if (event) {
1652                event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1653                if (!event_work) {
1654                        dout("ERROR: could not allocate event_work\n");
1655                        goto done_err;
1656                }
1657                INIT_WORK(&event_work->work, do_event_work);
1658                event_work->event = event;
1659                event_work->ver = ver;
1660                event_work->notify_id = notify_id;
1661                event_work->opcode = opcode;
1662                if (!queue_work(osdc->notify_wq, &event_work->work)) {
1663                        dout("WARNING: failed to queue notify event work\n");
1664                        goto done_err;
1665                }
1666        }
1667
1668        return;
1669
1670done_err:
1671        complete(&event->completion);
1672        ceph_osdc_put_event(event);
1673        return;
1674
1675bad:
1676        pr_err("osdc handle_watch_notify corrupt msg\n");
1677        return;
1678}
1679
1680int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1681{
1682        int err;
1683
1684        dout("wait_event %p\n", event);
1685        err = wait_for_completion_interruptible_timeout(&event->completion,
1686                                                        timeout * HZ);
1687        ceph_osdc_put_event(event);
1688        if (err > 0)
1689                err = 0;
1690        dout("wait_event %p returns %d\n", event, err);
1691        return err;
1692}
1693EXPORT_SYMBOL(ceph_osdc_wait_event);
1694
1695/*
1696 * Register request, send initial attempt.
1697 */
1698int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1699                            struct ceph_osd_request *req,
1700                            bool nofail)
1701{
1702        int rc = 0;
1703
1704        req->r_request->pages = req->r_pages;
1705        req->r_request->nr_pages = req->r_num_pages;
1706#ifdef CONFIG_BLOCK
1707        req->r_request->bio = req->r_bio;
1708#endif
1709        req->r_request->trail = req->r_trail;
1710
1711        register_request(osdc, req);
1712
1713        down_read(&osdc->map_sem);
1714        mutex_lock(&osdc->request_mutex);
1715        /*
1716         * a racing kick_requests() may have sent the message for us
1717         * while we dropped request_mutex above, so only send now if
1718         * the request still han't been touched yet.
1719         */
1720        if (req->r_sent == 0) {
1721                rc = __map_request(osdc, req, 0);
1722                if (rc < 0) {
1723                        if (nofail) {
1724                                dout("osdc_start_request failed map, "
1725                                     " will retry %lld\n", req->r_tid);
1726                                rc = 0;
1727                        }
1728                        goto out_unlock;
1729                }
1730                if (req->r_osd == NULL) {
1731                        dout("send_request %p no up osds in pg\n", req);
1732                        ceph_monc_request_next_osdmap(&osdc->client->monc);
1733                } else {
1734                        __send_request(osdc, req);
1735                }
1736                rc = 0;
1737        }
1738
1739out_unlock:
1740        mutex_unlock(&osdc->request_mutex);
1741        up_read(&osdc->map_sem);
1742        return rc;
1743}
1744EXPORT_SYMBOL(ceph_osdc_start_request);
1745
1746/*
1747 * wait for a request to complete
1748 */
1749int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1750                           struct ceph_osd_request *req)
1751{
1752        int rc;
1753
1754        rc = wait_for_completion_interruptible(&req->r_completion);
1755        if (rc < 0) {
1756                mutex_lock(&osdc->request_mutex);
1757                __cancel_request(req);
1758                __unregister_request(osdc, req);
1759                mutex_unlock(&osdc->request_mutex);
1760                complete_request(req);
1761                dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1762                return rc;
1763        }
1764
1765        dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1766        return req->r_result;
1767}
1768EXPORT_SYMBOL(ceph_osdc_wait_request);
1769
1770/*
1771 * sync - wait for all in-flight requests to flush.  avoid starvation.
1772 */
1773void ceph_osdc_sync(struct ceph_osd_client *osdc)
1774{
1775        struct ceph_osd_request *req;
1776        u64 last_tid, next_tid = 0;
1777
1778        mutex_lock(&osdc->request_mutex);
1779        last_tid = osdc->last_tid;
1780        while (1) {
1781                req = __lookup_request_ge(osdc, next_tid);
1782                if (!req)
1783                        break;
1784                if (req->r_tid > last_tid)
1785                        break;
1786
1787                next_tid = req->r_tid + 1;
1788                if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1789                        continue;
1790
1791                ceph_osdc_get_request(req);
1792                mutex_unlock(&osdc->request_mutex);
1793                dout("sync waiting on tid %llu (last is %llu)\n",
1794                     req->r_tid, last_tid);
1795                wait_for_completion(&req->r_safe_completion);
1796                mutex_lock(&osdc->request_mutex);
1797                ceph_osdc_put_request(req);
1798        }
1799        mutex_unlock(&osdc->request_mutex);
1800        dout("sync done (thru tid %llu)\n", last_tid);
1801}
1802EXPORT_SYMBOL(ceph_osdc_sync);
1803
1804/*
1805 * init, shutdown
1806 */
1807int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1808{
1809        int err;
1810
1811        dout("init\n");
1812        osdc->client = client;
1813        osdc->osdmap = NULL;
1814        init_rwsem(&osdc->map_sem);
1815        init_completion(&osdc->map_waiters);
1816        osdc->last_requested_map = 0;
1817        mutex_init(&osdc->request_mutex);
1818        osdc->last_tid = 0;
1819        osdc->osds = RB_ROOT;
1820        INIT_LIST_HEAD(&osdc->osd_lru);
1821        osdc->requests = RB_ROOT;
1822        INIT_LIST_HEAD(&osdc->req_lru);
1823        INIT_LIST_HEAD(&osdc->req_unsent);
1824        INIT_LIST_HEAD(&osdc->req_notarget);
1825        INIT_LIST_HEAD(&osdc->req_linger);
1826        osdc->num_requests = 0;
1827        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1828        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1829        spin_lock_init(&osdc->event_lock);
1830        osdc->event_tree = RB_ROOT;
1831        osdc->event_count = 0;
1832
1833        schedule_delayed_work(&osdc->osds_timeout_work,
1834           round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1835
1836        err = -ENOMEM;
1837        osdc->req_mempool = mempool_create_kmalloc_pool(10,
1838                                        sizeof(struct ceph_osd_request));
1839        if (!osdc->req_mempool)
1840                goto out;
1841
1842        err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
1843                                OSD_OP_FRONT_LEN, 10, true,
1844                                "osd_op");
1845        if (err < 0)
1846                goto out_mempool;
1847        err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
1848                                OSD_OPREPLY_FRONT_LEN, 10, true,
1849                                "osd_op_reply");
1850        if (err < 0)
1851                goto out_msgpool;
1852
1853        osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1854        if (IS_ERR(osdc->notify_wq)) {
1855                err = PTR_ERR(osdc->notify_wq);
1856                osdc->notify_wq = NULL;
1857                goto out_msgpool;
1858        }
1859        return 0;
1860
1861out_msgpool:
1862        ceph_msgpool_destroy(&osdc->msgpool_op);
1863out_mempool:
1864        mempool_destroy(osdc->req_mempool);
1865out:
1866        return err;
1867}
1868EXPORT_SYMBOL(ceph_osdc_init);
1869
1870void ceph_osdc_stop(struct ceph_osd_client *osdc)
1871{
1872        flush_workqueue(osdc->notify_wq);
1873        destroy_workqueue(osdc->notify_wq);
1874        cancel_delayed_work_sync(&osdc->timeout_work);
1875        cancel_delayed_work_sync(&osdc->osds_timeout_work);
1876        if (osdc->osdmap) {
1877                ceph_osdmap_destroy(osdc->osdmap);
1878                osdc->osdmap = NULL;
1879        }
1880        remove_all_osds(osdc);
1881        mempool_destroy(osdc->req_mempool);
1882        ceph_msgpool_destroy(&osdc->msgpool_op);
1883        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1884}
1885EXPORT_SYMBOL(ceph_osdc_stop);
1886
1887/*
1888 * Read some contiguous pages.  If we cross a stripe boundary, shorten
1889 * *plen.  Return number of bytes read, or error.
1890 */
1891int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1892                        struct ceph_vino vino, struct ceph_file_layout *layout,
1893                        u64 off, u64 *plen,
1894                        u32 truncate_seq, u64 truncate_size,
1895                        struct page **pages, int num_pages, int page_align)
1896{
1897        struct ceph_osd_request *req;
1898        int rc = 0;
1899
1900        dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1901             vino.snap, off, *plen);
1902        req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1903                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1904                                    NULL, 0, truncate_seq, truncate_size, NULL,
1905                                    false, 1, page_align);
1906        if (IS_ERR(req))
1907                return PTR_ERR(req);
1908
1909        /* it may be a short read due to an object boundary */
1910        req->r_pages = pages;
1911
1912        dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1913             off, *plen, req->r_num_pages, page_align);
1914
1915        rc = ceph_osdc_start_request(osdc, req, false);
1916        if (!rc)
1917                rc = ceph_osdc_wait_request(osdc, req);
1918
1919        ceph_osdc_put_request(req);
1920        dout("readpages result %d\n", rc);
1921        return rc;
1922}
1923EXPORT_SYMBOL(ceph_osdc_readpages);
1924
1925/*
1926 * do a synchronous write on N pages
1927 */
1928int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1929                         struct ceph_file_layout *layout,
1930                         struct ceph_snap_context *snapc,
1931                         u64 off, u64 len,
1932                         u32 truncate_seq, u64 truncate_size,
1933                         struct timespec *mtime,
1934                         struct page **pages, int num_pages,
1935                         int flags, int do_sync, bool nofail)
1936{
1937        struct ceph_osd_request *req;
1938        int rc = 0;
1939        int page_align = off & ~PAGE_MASK;
1940
1941        BUG_ON(vino.snap != CEPH_NOSNAP);
1942        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1943                                    CEPH_OSD_OP_WRITE,
1944                                    flags | CEPH_OSD_FLAG_ONDISK |
1945                                            CEPH_OSD_FLAG_WRITE,
1946                                    snapc, do_sync,
1947                                    truncate_seq, truncate_size, mtime,
1948                                    nofail, 1, page_align);
1949        if (IS_ERR(req))
1950                return PTR_ERR(req);
1951
1952        /* it may be a short write due to an object boundary */
1953        req->r_pages = pages;
1954        dout("writepages %llu~%llu (%d pages)\n", off, len,
1955             req->r_num_pages);
1956
1957        rc = ceph_osdc_start_request(osdc, req, nofail);
1958        if (!rc)
1959                rc = ceph_osdc_wait_request(osdc, req);
1960
1961        ceph_osdc_put_request(req);
1962        if (rc == 0)
1963                rc = len;
1964        dout("writepages result %d\n", rc);
1965        return rc;
1966}
1967EXPORT_SYMBOL(ceph_osdc_writepages);
1968
1969/*
1970 * handle incoming message
1971 */
1972static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1973{
1974        struct ceph_osd *osd = con->private;
1975        struct ceph_osd_client *osdc;
1976        int type = le16_to_cpu(msg->hdr.type);
1977
1978        if (!osd)
1979                goto out;
1980        osdc = osd->o_osdc;
1981
1982        switch (type) {
1983        case CEPH_MSG_OSD_MAP:
1984                ceph_osdc_handle_map(osdc, msg);
1985                break;
1986        case CEPH_MSG_OSD_OPREPLY:
1987                handle_reply(osdc, msg, con);
1988                break;
1989        case CEPH_MSG_WATCH_NOTIFY:
1990                handle_watch_notify(osdc, msg);
1991                break;
1992
1993        default:
1994                pr_err("received unknown message type %d %s\n", type,
1995                       ceph_msg_type_name(type));
1996        }
1997out:
1998        ceph_msg_put(msg);
1999}
2000
2001/*
2002 * lookup and return message for incoming reply.  set up reply message
2003 * pages.
2004 */
2005static struct ceph_msg *get_reply(struct ceph_connection *con,
2006                                  struct ceph_msg_header *hdr,
2007                                  int *skip)
2008{
2009        struct ceph_osd *osd = con->private;
2010        struct ceph_osd_client *osdc = osd->o_osdc;
2011        struct ceph_msg *m;
2012        struct ceph_osd_request *req;
2013        int front = le32_to_cpu(hdr->front_len);
2014        int data_len = le32_to_cpu(hdr->data_len);
2015        u64 tid;
2016
2017        tid = le64_to_cpu(hdr->tid);
2018        mutex_lock(&osdc->request_mutex);
2019        req = __lookup_request(osdc, tid);
2020        if (!req) {
2021                *skip = 1;
2022                m = NULL;
2023                dout("get_reply unknown tid %llu from osd%d\n", tid,
2024                     osd->o_osd);
2025                goto out;
2026        }
2027
2028        if (req->r_con_filling_msg) {
2029                dout("%s revoking msg %p from old con %p\n", __func__,
2030                     req->r_reply, req->r_con_filling_msg);
2031                ceph_msg_revoke_incoming(req->r_reply);
2032                req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2033                req->r_con_filling_msg = NULL;
2034        }
2035
2036        if (front > req->r_reply->front.iov_len) {
2037                pr_warning("get_reply front %d > preallocated %d\n",
2038                           front, (int)req->r_reply->front.iov_len);
2039                m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2040                if (!m)
2041                        goto out;
2042                ceph_msg_put(req->r_reply);
2043                req->r_reply = m;
2044        }
2045        m = ceph_msg_get(req->r_reply);
2046
2047        if (data_len > 0) {
2048                int want = calc_pages_for(req->r_page_alignment, data_len);
2049
2050                if (unlikely(req->r_num_pages < want)) {
2051                        pr_warning("tid %lld reply has %d bytes %d pages, we"
2052                                   " had only %d pages ready\n", tid, data_len,
2053                                   want, req->r_num_pages);
2054                        *skip = 1;
2055                        ceph_msg_put(m);
2056                        m = NULL;
2057                        goto out;
2058                }
2059                m->pages = req->r_pages;
2060                m->nr_pages = req->r_num_pages;
2061                m->page_alignment = req->r_page_alignment;
2062#ifdef CONFIG_BLOCK
2063                m->bio = req->r_bio;
2064#endif
2065        }
2066        *skip = 0;
2067        req->r_con_filling_msg = con->ops->get(con);
2068        dout("get_reply tid %lld %p\n", tid, m);
2069
2070out:
2071        mutex_unlock(&osdc->request_mutex);
2072        return m;
2073
2074}
2075
2076static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2077                                  struct ceph_msg_header *hdr,
2078                                  int *skip)
2079{
2080        struct ceph_osd *osd = con->private;
2081        int type = le16_to_cpu(hdr->type);
2082        int front = le32_to_cpu(hdr->front_len);
2083
2084        *skip = 0;
2085        switch (type) {
2086        case CEPH_MSG_OSD_MAP:
2087        case CEPH_MSG_WATCH_NOTIFY:
2088                return ceph_msg_new(type, front, GFP_NOFS, false);
2089        case CEPH_MSG_OSD_OPREPLY:
2090                return get_reply(con, hdr, skip);
2091        default:
2092                pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2093                        osd->o_osd);
2094                *skip = 1;
2095                return NULL;
2096        }
2097}
2098
2099/*
2100 * Wrappers to refcount containing ceph_osd struct
2101 */
2102static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2103{
2104        struct ceph_osd *osd = con->private;
2105        if (get_osd(osd))
2106                return con;
2107        return NULL;
2108}
2109
2110static void put_osd_con(struct ceph_connection *con)
2111{
2112        struct ceph_osd *osd = con->private;
2113        put_osd(osd);
2114}
2115
2116/*
2117 * authentication
2118 */
2119/*
2120 * Note: returned pointer is the address of a structure that's
2121 * managed separately.  Caller must *not* attempt to free it.
2122 */
2123static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2124                                        int *proto, int force_new)
2125{
2126        struct ceph_osd *o = con->private;
2127        struct ceph_osd_client *osdc = o->o_osdc;
2128        struct ceph_auth_client *ac = osdc->client->monc.auth;
2129        struct ceph_auth_handshake *auth = &o->o_auth;
2130
2131        if (force_new && auth->authorizer) {
2132                if (ac->ops && ac->ops->destroy_authorizer)
2133                        ac->ops->destroy_authorizer(ac, auth->authorizer);
2134                auth->authorizer = NULL;
2135        }
2136        if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2137                int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2138                                                        auth);
2139                if (ret)
2140                        return ERR_PTR(ret);
2141        }
2142        *proto = ac->protocol;
2143
2144        return auth;
2145}
2146
2147
2148static int verify_authorizer_reply(struct ceph_connection *con, int len)
2149{
2150        struct ceph_osd *o = con->private;
2151        struct ceph_osd_client *osdc = o->o_osdc;
2152        struct ceph_auth_client *ac = osdc->client->monc.auth;
2153
2154        /*
2155         * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2156         * XXX which do we do:  succeed or fail?
2157         */
2158        return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2159}
2160
2161static int invalidate_authorizer(struct ceph_connection *con)
2162{
2163        struct ceph_osd *o = con->private;
2164        struct ceph_osd_client *osdc = o->o_osdc;
2165        struct ceph_auth_client *ac = osdc->client->monc.auth;
2166
2167        if (ac->ops && ac->ops->invalidate_authorizer)
2168                ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2169
2170        return ceph_monc_validate_auth(&osdc->client->monc);
2171}
2172
2173static const struct ceph_connection_operations osd_con_ops = {
2174        .get = get_osd_con,
2175        .put = put_osd_con,
2176        .dispatch = dispatch,
2177        .get_authorizer = get_authorizer,
2178        .verify_authorizer_reply = verify_authorizer_reply,
2179        .invalidate_authorizer = invalidate_authorizer,
2180        .alloc_msg = alloc_msg,
2181        .fault = osd_reset,
2182};
2183
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.