linux/net/ceph/mon_client.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/types.h>
   5#include <linux/slab.h>
   6#include <linux/random.h>
   7#include <linux/sched.h>
   8
   9#include <linux/ceph/mon_client.h>
  10#include <linux/ceph/libceph.h>
  11#include <linux/ceph/debugfs.h>
  12#include <linux/ceph/decode.h>
  13#include <linux/ceph/auth.h>
  14
  15/*
  16 * Interact with Ceph monitor cluster.  Handle requests for new map
  17 * versions, and periodically resend as needed.  Also implement
  18 * statfs() and umount().
  19 *
  20 * A small cluster of Ceph "monitors" are responsible for managing critical
  21 * cluster configuration and state information.  An odd number (e.g., 3, 5)
  22 * of cmon daemons use a modified version of the Paxos part-time parliament
  23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and
  24 * list of clients who have mounted the file system.
  25 *
  26 * We maintain an open, active session with a monitor at all times in order to
  27 * receive timely MDSMap updates.  We periodically send a keepalive byte on the
  28 * TCP socket to ensure we detect a failure.  If the connection does break, we
  29 * randomly hunt for a new monitor.  Once the connection is reestablished, we
  30 * resend any outstanding requests.
  31 */
  32
  33static const struct ceph_connection_operations mon_con_ops;
  34
  35static int __validate_auth(struct ceph_mon_client *monc);
  36
  37/*
  38 * Decode a monmap blob (e.g., during mount).
  39 */
  40struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
  41{
  42        struct ceph_monmap *m = NULL;
  43        int i, err = -EINVAL;
  44        struct ceph_fsid fsid;
  45        u32 epoch, num_mon;
  46        u16 version;
  47        u32 len;
  48
  49        ceph_decode_32_safe(&p, end, len, bad);
  50        ceph_decode_need(&p, end, len, bad);
  51
  52        dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
  53
  54        ceph_decode_16_safe(&p, end, version, bad);
  55
  56        ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
  57        ceph_decode_copy(&p, &fsid, sizeof(fsid));
  58        epoch = ceph_decode_32(&p);
  59
  60        num_mon = ceph_decode_32(&p);
  61        ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
  62
  63        if (num_mon >= CEPH_MAX_MON)
  64                goto bad;
  65        m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
  66        if (m == NULL)
  67                return ERR_PTR(-ENOMEM);
  68        m->fsid = fsid;
  69        m->epoch = epoch;
  70        m->num_mon = num_mon;
  71        ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
  72        for (i = 0; i < num_mon; i++)
  73                ceph_decode_addr(&m->mon_inst[i].addr);
  74
  75        dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
  76             m->num_mon);
  77        for (i = 0; i < m->num_mon; i++)
  78                dout("monmap_decode  mon%d is %s\n", i,
  79                     ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
  80        return m;
  81
  82bad:
  83        dout("monmap_decode failed with %d\n", err);
  84        kfree(m);
  85        return ERR_PTR(err);
  86}
  87
  88/*
  89 * return true if *addr is included in the monmap.
  90 */
  91int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
  92{
  93        int i;
  94
  95        for (i = 0; i < m->num_mon; i++)
  96                if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
  97                        return 1;
  98        return 0;
  99}
 100
 101/*
 102 * Send an auth request.
 103 */
 104static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
 105{
 106        monc->pending_auth = 1;
 107        monc->m_auth->front.iov_len = len;
 108        monc->m_auth->hdr.front_len = cpu_to_le32(len);
 109        ceph_con_revoke(monc->con, monc->m_auth);
 110        ceph_msg_get(monc->m_auth);  /* keep our ref */
 111        ceph_con_send(monc->con, monc->m_auth);
 112}
 113
 114/*
 115 * Close monitor session, if any.
 116 */
 117static void __close_session(struct ceph_mon_client *monc)
 118{
 119        dout("__close_session closing mon%d\n", monc->cur_mon);
 120        ceph_con_revoke(monc->con, monc->m_auth);
 121        ceph_con_close(monc->con);
 122        monc->cur_mon = -1;
 123        monc->pending_auth = 0;
 124        ceph_auth_reset(monc->auth);
 125}
 126
 127/*
 128 * Open a session with a (new) monitor.
 129 */
 130static int __open_session(struct ceph_mon_client *monc)
 131{
 132        char r;
 133        int ret;
 134
 135        if (monc->cur_mon < 0) {
 136                get_random_bytes(&r, 1);
 137                monc->cur_mon = r % monc->monmap->num_mon;
 138                dout("open_session num=%d r=%d -> mon%d\n",
 139                     monc->monmap->num_mon, r, monc->cur_mon);
 140                monc->sub_sent = 0;
 141                monc->sub_renew_after = jiffies;  /* i.e., expired */
 142                monc->want_next_osdmap = !!monc->want_next_osdmap;
 143
 144                dout("open_session mon%d opening\n", monc->cur_mon);
 145                monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
 146                monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
 147                ceph_con_open(monc->con,
 148                              &monc->monmap->mon_inst[monc->cur_mon].addr);
 149
 150                /* initiatiate authentication handshake */
 151                ret = ceph_auth_build_hello(monc->auth,
 152                                            monc->m_auth->front.iov_base,
 153                                            monc->m_auth->front_max);
 154                __send_prepared_auth_request(monc, ret);
 155        } else {
 156                dout("open_session mon%d already open\n", monc->cur_mon);
 157        }
 158        return 0;
 159}
 160
 161static bool __sub_expired(struct ceph_mon_client *monc)
 162{
 163        return time_after_eq(jiffies, monc->sub_renew_after);
 164}
 165
 166/*
 167 * Reschedule delayed work timer.
 168 */
 169static void __schedule_delayed(struct ceph_mon_client *monc)
 170{
 171        unsigned delay;
 172
 173        if (monc->cur_mon < 0 || __sub_expired(monc))
 174                delay = 10 * HZ;
 175        else
 176                delay = 20 * HZ;
 177        dout("__schedule_delayed after %u\n", delay);
 178        schedule_delayed_work(&monc->delayed_work, delay);
 179}
 180
 181/*
 182 * Send subscribe request for mdsmap and/or osdmap.
 183 */
 184static void __send_subscribe(struct ceph_mon_client *monc)
 185{
 186        dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
 187             (unsigned)monc->sub_sent, __sub_expired(monc),
 188             monc->want_next_osdmap);
 189        if ((__sub_expired(monc) && !monc->sub_sent) ||
 190            monc->want_next_osdmap == 1) {
 191                struct ceph_msg *msg = monc->m_subscribe;
 192                struct ceph_mon_subscribe_item *i;
 193                void *p, *end;
 194                int num;
 195
 196                p = msg->front.iov_base;
 197                end = p + msg->front_max;
 198
 199                num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
 200                ceph_encode_32(&p, num);
 201
 202                if (monc->want_next_osdmap) {
 203                        dout("__send_subscribe to 'osdmap' %u\n",
 204                             (unsigned)monc->have_osdmap);
 205                        ceph_encode_string(&p, end, "osdmap", 6);
 206                        i = p;
 207                        i->have = cpu_to_le64(monc->have_osdmap);
 208                        i->onetime = 1;
 209                        p += sizeof(*i);
 210                        monc->want_next_osdmap = 2;  /* requested */
 211                }
 212                if (monc->want_mdsmap) {
 213                        dout("__send_subscribe to 'mdsmap' %u+\n",
 214                             (unsigned)monc->have_mdsmap);
 215                        ceph_encode_string(&p, end, "mdsmap", 6);
 216                        i = p;
 217                        i->have = cpu_to_le64(monc->have_mdsmap);
 218                        i->onetime = 0;
 219                        p += sizeof(*i);
 220                }
 221                ceph_encode_string(&p, end, "monmap", 6);
 222                i = p;
 223                i->have = 0;
 224                i->onetime = 0;
 225                p += sizeof(*i);
 226
 227                msg->front.iov_len = p - msg->front.iov_base;
 228                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 229                ceph_con_revoke(monc->con, msg);
 230                ceph_con_send(monc->con, ceph_msg_get(msg));
 231
 232                monc->sub_sent = jiffies | 1;  /* never 0 */
 233        }
 234}
 235
 236static void handle_subscribe_ack(struct ceph_mon_client *monc,
 237                                 struct ceph_msg *msg)
 238{
 239        unsigned seconds;
 240        struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
 241
 242        if (msg->front.iov_len < sizeof(*h))
 243                goto bad;
 244        seconds = le32_to_cpu(h->duration);
 245
 246        mutex_lock(&monc->mutex);
 247        if (monc->hunting) {
 248                pr_info("mon%d %s session established\n",
 249                        monc->cur_mon,
 250                        ceph_pr_addr(&monc->con->peer_addr.in_addr));
 251                monc->hunting = false;
 252        }
 253        dout("handle_subscribe_ack after %d seconds\n", seconds);
 254        monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
 255        monc->sub_sent = 0;
 256        mutex_unlock(&monc->mutex);
 257        return;
 258bad:
 259        pr_err("got corrupt subscribe-ack msg\n");
 260        ceph_msg_dump(msg);
 261}
 262
 263/*
 264 * Keep track of which maps we have
 265 */
 266int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
 267{
 268        mutex_lock(&monc->mutex);
 269        monc->have_mdsmap = got;
 270        mutex_unlock(&monc->mutex);
 271        return 0;
 272}
 273EXPORT_SYMBOL(ceph_monc_got_mdsmap);
 274
 275int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
 276{
 277        mutex_lock(&monc->mutex);
 278        monc->have_osdmap = got;
 279        monc->want_next_osdmap = 0;
 280        mutex_unlock(&monc->mutex);
 281        return 0;
 282}
 283
 284/*
 285 * Register interest in the next osdmap
 286 */
 287void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
 288{
 289        dout("request_next_osdmap have %u\n", monc->have_osdmap);
 290        mutex_lock(&monc->mutex);
 291        if (!monc->want_next_osdmap)
 292                monc->want_next_osdmap = 1;
 293        if (monc->want_next_osdmap < 2)
 294                __send_subscribe(monc);
 295        mutex_unlock(&monc->mutex);
 296}
 297
 298/*
 299 *
 300 */
 301int ceph_monc_open_session(struct ceph_mon_client *monc)
 302{
 303        mutex_lock(&monc->mutex);
 304        __open_session(monc);
 305        __schedule_delayed(monc);
 306        mutex_unlock(&monc->mutex);
 307        return 0;
 308}
 309EXPORT_SYMBOL(ceph_monc_open_session);
 310
 311/*
 312 * The monitor responds with mount ack indicate mount success.  The
 313 * included client ticket allows the client to talk to MDSs and OSDs.
 314 */
 315static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 316                                 struct ceph_msg *msg)
 317{
 318        struct ceph_client *client = monc->client;
 319        struct ceph_monmap *monmap = NULL, *old = monc->monmap;
 320        void *p, *end;
 321
 322        mutex_lock(&monc->mutex);
 323
 324        dout("handle_monmap\n");
 325        p = msg->front.iov_base;
 326        end = p + msg->front.iov_len;
 327
 328        monmap = ceph_monmap_decode(p, end);
 329        if (IS_ERR(monmap)) {
 330                pr_err("problem decoding monmap, %d\n",
 331                       (int)PTR_ERR(monmap));
 332                goto out;
 333        }
 334
 335        if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
 336                kfree(monmap);
 337                goto out;
 338        }
 339
 340        client->monc.monmap = monmap;
 341        kfree(old);
 342
 343        if (!client->have_fsid) {
 344                client->have_fsid = true;
 345                mutex_unlock(&monc->mutex);
 346                /*
 347                 * do debugfs initialization without mutex to avoid
 348                 * creating a locking dependency
 349                 */
 350                ceph_debugfs_client_init(client);
 351                goto out_unlocked;
 352        }
 353out:
 354        mutex_unlock(&monc->mutex);
 355out_unlocked:
 356        wake_up_all(&client->auth_wq);
 357}
 358
 359/*
 360 * generic requests (e.g., statfs, poolop)
 361 */
 362static struct ceph_mon_generic_request *__lookup_generic_req(
 363        struct ceph_mon_client *monc, u64 tid)
 364{
 365        struct ceph_mon_generic_request *req;
 366        struct rb_node *n = monc->generic_request_tree.rb_node;
 367
 368        while (n) {
 369                req = rb_entry(n, struct ceph_mon_generic_request, node);
 370                if (tid < req->tid)
 371                        n = n->rb_left;
 372                else if (tid > req->tid)
 373                        n = n->rb_right;
 374                else
 375                        return req;
 376        }
 377        return NULL;
 378}
 379
 380static void __insert_generic_request(struct ceph_mon_client *monc,
 381                            struct ceph_mon_generic_request *new)
 382{
 383        struct rb_node **p = &monc->generic_request_tree.rb_node;
 384        struct rb_node *parent = NULL;
 385        struct ceph_mon_generic_request *req = NULL;
 386
 387        while (*p) {
 388                parent = *p;
 389                req = rb_entry(parent, struct ceph_mon_generic_request, node);
 390                if (new->tid < req->tid)
 391                        p = &(*p)->rb_left;
 392                else if (new->tid > req->tid)
 393                        p = &(*p)->rb_right;
 394                else
 395                        BUG();
 396        }
 397
 398        rb_link_node(&new->node, parent, p);
 399        rb_insert_color(&new->node, &monc->generic_request_tree);
 400}
 401
 402static void release_generic_request(struct kref *kref)
 403{
 404        struct ceph_mon_generic_request *req =
 405                container_of(kref, struct ceph_mon_generic_request, kref);
 406
 407        if (req->reply)
 408                ceph_msg_put(req->reply);
 409        if (req->request)
 410                ceph_msg_put(req->request);
 411
 412        kfree(req);
 413}
 414
 415static void put_generic_request(struct ceph_mon_generic_request *req)
 416{
 417        kref_put(&req->kref, release_generic_request);
 418}
 419
 420static void get_generic_request(struct ceph_mon_generic_request *req)
 421{
 422        kref_get(&req->kref);
 423}
 424
 425static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
 426                                         struct ceph_msg_header *hdr,
 427                                         int *skip)
 428{
 429        struct ceph_mon_client *monc = con->private;
 430        struct ceph_mon_generic_request *req;
 431        u64 tid = le64_to_cpu(hdr->tid);
 432        struct ceph_msg *m;
 433
 434        mutex_lock(&monc->mutex);
 435        req = __lookup_generic_req(monc, tid);
 436        if (!req) {
 437                dout("get_generic_reply %lld dne\n", tid);
 438                *skip = 1;
 439                m = NULL;
 440        } else {
 441                dout("get_generic_reply %lld got %p\n", tid, req->reply);
 442                m = ceph_msg_get(req->reply);
 443                /*
 444                 * we don't need to track the connection reading into
 445                 * this reply because we only have one open connection
 446                 * at a time, ever.
 447                 */
 448        }
 449        mutex_unlock(&monc->mutex);
 450        return m;
 451}
 452
 453static int do_generic_request(struct ceph_mon_client *monc,
 454                              struct ceph_mon_generic_request *req)
 455{
 456        int err;
 457
 458        /* register request */
 459        mutex_lock(&monc->mutex);
 460        req->tid = ++monc->last_tid;
 461        req->request->hdr.tid = cpu_to_le64(req->tid);
 462        __insert_generic_request(monc, req);
 463        monc->num_generic_requests++;
 464        ceph_con_send(monc->con, ceph_msg_get(req->request));
 465        mutex_unlock(&monc->mutex);
 466
 467        err = wait_for_completion_interruptible(&req->completion);
 468
 469        mutex_lock(&monc->mutex);
 470        rb_erase(&req->node, &monc->generic_request_tree);
 471        monc->num_generic_requests--;
 472        mutex_unlock(&monc->mutex);
 473
 474        if (!err)
 475                err = req->result;
 476        return err;
 477}
 478
 479/*
 480 * statfs
 481 */
 482static void handle_statfs_reply(struct ceph_mon_client *monc,
 483                                struct ceph_msg *msg)
 484{
 485        struct ceph_mon_generic_request *req;
 486        struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
 487        u64 tid = le64_to_cpu(msg->hdr.tid);
 488
 489        if (msg->front.iov_len != sizeof(*reply))
 490                goto bad;
 491        dout("handle_statfs_reply %p tid %llu\n", msg, tid);
 492
 493        mutex_lock(&monc->mutex);
 494        req = __lookup_generic_req(monc, tid);
 495        if (req) {
 496                *(struct ceph_statfs *)req->buf = reply->st;
 497                req->result = 0;
 498                get_generic_request(req);
 499        }
 500        mutex_unlock(&monc->mutex);
 501        if (req) {
 502                complete_all(&req->completion);
 503                put_generic_request(req);
 504        }
 505        return;
 506
 507bad:
 508        pr_err("corrupt generic reply, tid %llu\n", tid);
 509        ceph_msg_dump(msg);
 510}
 511
 512/*
 513 * Do a synchronous statfs().
 514 */
 515int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 516{
 517        struct ceph_mon_generic_request *req;
 518        struct ceph_mon_statfs *h;
 519        int err;
 520
 521        req = kzalloc(sizeof(*req), GFP_NOFS);
 522        if (!req)
 523                return -ENOMEM;
 524
 525        kref_init(&req->kref);
 526        req->buf = buf;
 527        req->buf_len = sizeof(*buf);
 528        init_completion(&req->completion);
 529
 530        err = -ENOMEM;
 531        req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
 532                                    true);
 533        if (!req->request)
 534                goto out;
 535        req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
 536                                  true);
 537        if (!req->reply)
 538                goto out;
 539
 540        /* fill out request */
 541        h = req->request->front.iov_base;
 542        h->monhdr.have_version = 0;
 543        h->monhdr.session_mon = cpu_to_le16(-1);
 544        h->monhdr.session_mon_tid = 0;
 545        h->fsid = monc->monmap->fsid;
 546
 547        err = do_generic_request(monc, req);
 548
 549out:
 550        kref_put(&req->kref, release_generic_request);
 551        return err;
 552}
 553EXPORT_SYMBOL(ceph_monc_do_statfs);
 554
 555/*
 556 * pool ops
 557 */
 558static int get_poolop_reply_buf(const char *src, size_t src_len,
 559                                char *dst, size_t dst_len)
 560{
 561        u32 buf_len;
 562
 563        if (src_len != sizeof(u32) + dst_len)
 564                return -EINVAL;
 565
 566        buf_len = le32_to_cpu(*(u32 *)src);
 567        if (buf_len != dst_len)
 568                return -EINVAL;
 569
 570        memcpy(dst, src + sizeof(u32), dst_len);
 571        return 0;
 572}
 573
 574static void handle_poolop_reply(struct ceph_mon_client *monc,
 575                                struct ceph_msg *msg)
 576{
 577        struct ceph_mon_generic_request *req;
 578        struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
 579        u64 tid = le64_to_cpu(msg->hdr.tid);
 580
 581        if (msg->front.iov_len < sizeof(*reply))
 582                goto bad;
 583        dout("handle_poolop_reply %p tid %llu\n", msg, tid);
 584
 585        mutex_lock(&monc->mutex);
 586        req = __lookup_generic_req(monc, tid);
 587        if (req) {
 588                if (req->buf_len &&
 589                    get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
 590                                     msg->front.iov_len - sizeof(*reply),
 591                                     req->buf, req->buf_len) < 0) {
 592                        mutex_unlock(&monc->mutex);
 593                        goto bad;
 594                }
 595                req->result = le32_to_cpu(reply->reply_code);
 596                get_generic_request(req);
 597        }
 598        mutex_unlock(&monc->mutex);
 599        if (req) {
 600                complete(&req->completion);
 601                put_generic_request(req);
 602        }
 603        return;
 604
 605bad:
 606        pr_err("corrupt generic reply, tid %llu\n", tid);
 607        ceph_msg_dump(msg);
 608}
 609
 610/*
 611 * Do a synchronous pool op.
 612 */
 613int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
 614                        u32 pool, u64 snapid,
 615                        char *buf, int len)
 616{
 617        struct ceph_mon_generic_request *req;
 618        struct ceph_mon_poolop *h;
 619        int err;
 620
 621        req = kzalloc(sizeof(*req), GFP_NOFS);
 622        if (!req)
 623                return -ENOMEM;
 624
 625        kref_init(&req->kref);
 626        req->buf = buf;
 627        req->buf_len = len;
 628        init_completion(&req->completion);
 629
 630        err = -ENOMEM;
 631        req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
 632                                    true);
 633        if (!req->request)
 634                goto out;
 635        req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
 636                                  true);
 637        if (!req->reply)
 638                goto out;
 639
 640        /* fill out request */
 641        req->request->hdr.version = cpu_to_le16(2);
 642        h = req->request->front.iov_base;
 643        h->monhdr.have_version = 0;
 644        h->monhdr.session_mon = cpu_to_le16(-1);
 645        h->monhdr.session_mon_tid = 0;
 646        h->fsid = monc->monmap->fsid;
 647        h->pool = cpu_to_le32(pool);
 648        h->op = cpu_to_le32(op);
 649        h->auid = 0;
 650        h->snapid = cpu_to_le64(snapid);
 651        h->name_len = 0;
 652
 653        err = do_generic_request(monc, req);
 654
 655out:
 656        kref_put(&req->kref, release_generic_request);
 657        return err;
 658}
 659
 660int ceph_monc_create_snapid(struct ceph_mon_client *monc,
 661                            u32 pool, u64 *snapid)
 662{
 663        return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
 664                                   pool, 0, (char *)snapid, sizeof(*snapid));
 665
 666}
 667EXPORT_SYMBOL(ceph_monc_create_snapid);
 668
 669int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
 670                            u32 pool, u64 snapid)
 671{
 672        return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
 673                                   pool, snapid, 0, 0);
 674
 675}
 676
 677/*
 678 * Resend pending generic requests.
 679 */
 680static void __resend_generic_request(struct ceph_mon_client *monc)
 681{
 682        struct ceph_mon_generic_request *req;
 683        struct rb_node *p;
 684
 685        for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
 686                req = rb_entry(p, struct ceph_mon_generic_request, node);
 687                ceph_con_revoke(monc->con, req->request);
 688                ceph_con_send(monc->con, ceph_msg_get(req->request));
 689        }
 690}
 691
 692/*
 693 * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
 694 * renew/retry subscription as needed (in case it is timing out, or we
 695 * got an ENOMEM).  And keep the monitor connection alive.
 696 */
 697static void delayed_work(struct work_struct *work)
 698{
 699        struct ceph_mon_client *monc =
 700                container_of(work, struct ceph_mon_client, delayed_work.work);
 701
 702        dout("monc delayed_work\n");
 703        mutex_lock(&monc->mutex);
 704        if (monc->hunting) {
 705                __close_session(monc);
 706                __open_session(monc);  /* continue hunting */
 707        } else {
 708                ceph_con_keepalive(monc->con);
 709
 710                __validate_auth(monc);
 711
 712                if (monc->auth->ops->is_authenticated(monc->auth))
 713                        __send_subscribe(monc);
 714        }
 715        __schedule_delayed(monc);
 716        mutex_unlock(&monc->mutex);
 717}
 718
 719/*
 720 * On startup, we build a temporary monmap populated with the IPs
 721 * provided by mount(2).
 722 */
 723static int build_initial_monmap(struct ceph_mon_client *monc)
 724{
 725        struct ceph_options *opt = monc->client->options;
 726        struct ceph_entity_addr *mon_addr = opt->mon_addr;
 727        int num_mon = opt->num_mon;
 728        int i;
 729
 730        /* build initial monmap */
 731        monc->monmap = kzalloc(sizeof(*monc->monmap) +
 732                               num_mon*sizeof(monc->monmap->mon_inst[0]),
 733                               GFP_KERNEL);
 734        if (!monc->monmap)
 735                return -ENOMEM;
 736        for (i = 0; i < num_mon; i++) {
 737                monc->monmap->mon_inst[i].addr = mon_addr[i];
 738                monc->monmap->mon_inst[i].addr.nonce = 0;
 739                monc->monmap->mon_inst[i].name.type =
 740                        CEPH_ENTITY_TYPE_MON;
 741                monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
 742        }
 743        monc->monmap->num_mon = num_mon;
 744        monc->have_fsid = false;
 745        return 0;
 746}
 747
 748int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
 749{
 750        int err = 0;
 751
 752        dout("init\n");
 753        memset(monc, 0, sizeof(*monc));
 754        monc->client = cl;
 755        monc->monmap = NULL;
 756        mutex_init(&monc->mutex);
 757
 758        err = build_initial_monmap(monc);
 759        if (err)
 760                goto out;
 761
 762        /* connection */
 763        monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
 764        if (!monc->con)
 765                goto out_monmap;
 766        ceph_con_init(monc->client->msgr, monc->con);
 767        monc->con->private = monc;
 768        monc->con->ops = &mon_con_ops;
 769
 770        /* authentication */
 771        monc->auth = ceph_auth_init(cl->options->name,
 772                                    cl->options->key);
 773        if (IS_ERR(monc->auth)) {
 774                err = PTR_ERR(monc->auth);
 775                goto out_con;
 776        }
 777        monc->auth->want_keys =
 778                CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
 779                CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
 780
 781        /* msgs */
 782        err = -ENOMEM;
 783        monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
 784                                     sizeof(struct ceph_mon_subscribe_ack),
 785                                     GFP_NOFS, true);
 786        if (!monc->m_subscribe_ack)
 787                goto out_auth;
 788
 789        monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
 790                                         true);
 791        if (!monc->m_subscribe)
 792                goto out_subscribe_ack;
 793
 794        monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
 795                                          true);
 796        if (!monc->m_auth_reply)
 797                goto out_subscribe;
 798
 799        monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
 800        monc->pending_auth = 0;
 801        if (!monc->m_auth)
 802                goto out_auth_reply;
 803
 804        monc->cur_mon = -1;
 805        monc->hunting = true;
 806        monc->sub_renew_after = jiffies;
 807        monc->sub_sent = 0;
 808
 809        INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
 810        monc->generic_request_tree = RB_ROOT;
 811        monc->num_generic_requests = 0;
 812        monc->last_tid = 0;
 813
 814        monc->have_mdsmap = 0;
 815        monc->have_osdmap = 0;
 816        monc->want_next_osdmap = 1;
 817        return 0;
 818
 819out_auth_reply:
 820        ceph_msg_put(monc->m_auth_reply);
 821out_subscribe:
 822        ceph_msg_put(monc->m_subscribe);
 823out_subscribe_ack:
 824        ceph_msg_put(monc->m_subscribe_ack);
 825out_auth:
 826        ceph_auth_destroy(monc->auth);
 827out_con:
 828        monc->con->ops->put(monc->con);
 829out_monmap:
 830        kfree(monc->monmap);
 831out:
 832        return err;
 833}
 834EXPORT_SYMBOL(ceph_monc_init);
 835
 836void ceph_monc_stop(struct ceph_mon_client *monc)
 837{
 838        dout("stop\n");
 839        cancel_delayed_work_sync(&monc->delayed_work);
 840
 841        mutex_lock(&monc->mutex);
 842        __close_session(monc);
 843
 844        monc->con->private = NULL;
 845        monc->con->ops->put(monc->con);
 846        monc->con = NULL;
 847
 848        mutex_unlock(&monc->mutex);
 849
 850        ceph_auth_destroy(monc->auth);
 851
 852        ceph_msg_put(monc->m_auth);
 853        ceph_msg_put(monc->m_auth_reply);
 854        ceph_msg_put(monc->m_subscribe);
 855        ceph_msg_put(monc->m_subscribe_ack);
 856
 857        kfree(monc->monmap);
 858}
 859EXPORT_SYMBOL(ceph_monc_stop);
 860
 861static void handle_auth_reply(struct ceph_mon_client *monc,
 862                              struct ceph_msg *msg)
 863{
 864        int ret;
 865        int was_auth = 0;
 866
 867        mutex_lock(&monc->mutex);
 868        if (monc->auth->ops)
 869                was_auth = monc->auth->ops->is_authenticated(monc->auth);
 870        monc->pending_auth = 0;
 871        ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
 872                                     msg->front.iov_len,
 873                                     monc->m_auth->front.iov_base,
 874                                     monc->m_auth->front_max);
 875        if (ret < 0) {
 876                monc->client->auth_err = ret;
 877                wake_up_all(&monc->client->auth_wq);
 878        } else if (ret > 0) {
 879                __send_prepared_auth_request(monc, ret);
 880        } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
 881                dout("authenticated, starting session\n");
 882
 883                monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
 884                monc->client->msgr->inst.name.num =
 885                                        cpu_to_le64(monc->auth->global_id);
 886
 887                __send_subscribe(monc);
 888                __resend_generic_request(monc);
 889        }
 890        mutex_unlock(&monc->mutex);
 891}
 892
 893static int __validate_auth(struct ceph_mon_client *monc)
 894{
 895        int ret;
 896
 897        if (monc->pending_auth)
 898                return 0;
 899
 900        ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
 901                              monc->m_auth->front_max);
 902        if (ret <= 0)
 903                return ret; /* either an error, or no need to authenticate */
 904        __send_prepared_auth_request(monc, ret);
 905        return 0;
 906}
 907
 908int ceph_monc_validate_auth(struct ceph_mon_client *monc)
 909{
 910        int ret;
 911
 912        mutex_lock(&monc->mutex);
 913        ret = __validate_auth(monc);
 914        mutex_unlock(&monc->mutex);
 915        return ret;
 916}
 917EXPORT_SYMBOL(ceph_monc_validate_auth);
 918
 919/*
 920 * handle incoming message
 921 */
 922static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 923{
 924        struct ceph_mon_client *monc = con->private;
 925        int type = le16_to_cpu(msg->hdr.type);
 926
 927        if (!monc)
 928                return;
 929
 930        switch (type) {
 931        case CEPH_MSG_AUTH_REPLY:
 932                handle_auth_reply(monc, msg);
 933                break;
 934
 935        case CEPH_MSG_MON_SUBSCRIBE_ACK:
 936                handle_subscribe_ack(monc, msg);
 937                break;
 938
 939        case CEPH_MSG_STATFS_REPLY:
 940                handle_statfs_reply(monc, msg);
 941                break;
 942
 943        case CEPH_MSG_POOLOP_REPLY:
 944                handle_poolop_reply(monc, msg);
 945                break;
 946
 947        case CEPH_MSG_MON_MAP:
 948                ceph_monc_handle_map(monc, msg);
 949                break;
 950
 951        case CEPH_MSG_OSD_MAP:
 952                ceph_osdc_handle_map(&monc->client->osdc, msg);
 953                break;
 954
 955        default:
 956                /* can the chained handler handle it? */
 957                if (monc->client->extra_mon_dispatch &&
 958                    monc->client->extra_mon_dispatch(monc->client, msg) == 0)
 959                        break;
 960                        
 961                pr_err("received unknown message type %d %s\n", type,
 962                       ceph_msg_type_name(type));
 963        }
 964        ceph_msg_put(msg);
 965}
 966
 967/*
 968 * Allocate memory for incoming message
 969 */
 970static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
 971                                      struct ceph_msg_header *hdr,
 972                                      int *skip)
 973{
 974        struct ceph_mon_client *monc = con->private;
 975        int type = le16_to_cpu(hdr->type);
 976        int front_len = le32_to_cpu(hdr->front_len);
 977        struct ceph_msg *m = NULL;
 978
 979        *skip = 0;
 980
 981        switch (type) {
 982        case CEPH_MSG_MON_SUBSCRIBE_ACK:
 983                m = ceph_msg_get(monc->m_subscribe_ack);
 984                break;
 985        case CEPH_MSG_POOLOP_REPLY:
 986        case CEPH_MSG_STATFS_REPLY:
 987                return get_generic_reply(con, hdr, skip);
 988        case CEPH_MSG_AUTH_REPLY:
 989                m = ceph_msg_get(monc->m_auth_reply);
 990                break;
 991        case CEPH_MSG_MON_MAP:
 992        case CEPH_MSG_MDS_MAP:
 993        case CEPH_MSG_OSD_MAP:
 994                m = ceph_msg_new(type, front_len, GFP_NOFS, false);
 995                break;
 996        }
 997
 998        if (!m) {
 999                pr_info("alloc_msg unknown type %d\n", type);
1000                *skip = 1;
1001        }
1002        return m;
1003}
1004
1005/*
1006 * If the monitor connection resets, pick a new monitor and resubmit
1007 * any pending requests.
1008 */
1009static void mon_fault(struct ceph_connection *con)
1010{
1011        struct ceph_mon_client *monc = con->private;
1012
1013        if (!monc)
1014                return;
1015
1016        dout("mon_fault\n");
1017        mutex_lock(&monc->mutex);
1018        if (!con->private)
1019                goto out;
1020
1021        if (!monc->hunting)
1022                pr_info("mon%d %s session lost, "
1023                        "hunting for new mon\n", monc->cur_mon,
1024                        ceph_pr_addr(&monc->con->peer_addr.in_addr));
1025
1026        __close_session(monc);
1027        if (!monc->hunting) {
1028                /* start hunting */
1029                monc->hunting = true;
1030                __open_session(monc);
1031        } else {
1032                /* already hunting, let's wait a bit */
1033                __schedule_delayed(monc);
1034        }
1035out:
1036        mutex_unlock(&monc->mutex);
1037}
1038
1039static const struct ceph_connection_operations mon_con_ops = {
1040        .get = ceph_con_get,
1041        .put = ceph_con_put,
1042        .dispatch = dispatch,
1043        .fault = mon_fault,
1044        .alloc_msg = mon_alloc_msg,
1045};
1046
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.