linux/net/ceph/messenger.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/crc32c.h>
   4#include <linux/ctype.h>
   5#include <linux/highmem.h>
   6#include <linux/inet.h>
   7#include <linux/kthread.h>
   8#include <linux/net.h>
   9#include <linux/slab.h>
  10#include <linux/socket.h>
  11#include <linux/string.h>
  12#include <linux/bio.h>
  13#include <linux/blkdev.h>
  14#include <linux/dns_resolver.h>
  15#include <net/tcp.h>
  16
  17#include <linux/ceph/libceph.h>
  18#include <linux/ceph/messenger.h>
  19#include <linux/ceph/decode.h>
  20#include <linux/ceph/pagelist.h>
  21#include <linux/export.h>
  22
  23/*
  24 * Ceph uses the messenger to exchange ceph_msg messages with other
  25 * hosts in the system.  The messenger provides ordered and reliable
  26 * delivery.  We tolerate TCP disconnects by reconnecting (with
  27 * exponential backoff) in the case of a fault (disconnection, bad
  28 * crc, protocol error).  Acks allow sent messages to be discarded by
  29 * the sender.
  30 */
  31
  32/*
  33 * We track the state of the socket on a given connection using
  34 * values defined below.  The transition to a new socket state is
  35 * handled by a function which verifies we aren't coming from an
  36 * unexpected state.
  37 *
  38 *      --------
  39 *      | NEW* |  transient initial state
  40 *      --------
  41 *          | con_sock_state_init()
  42 *          v
  43 *      ----------
  44 *      | CLOSED |  initialized, but no socket (and no
  45 *      ----------  TCP connection)
  46 *       ^      \
  47 *       |       \ con_sock_state_connecting()
  48 *       |        ----------------------
  49 *       |                              \
  50 *       + con_sock_state_closed()       \
  51 *       |+---------------------------    \
  52 *       | \                          \    \
  53 *       |  -----------                \    \
  54 *       |  | CLOSING |  socket event;  \    \
  55 *       |  -----------  await close     \    \
  56 *       |       ^                        \   |
  57 *       |       |                         \  |
  58 *       |       + con_sock_state_closing() \ |
  59 *       |      / \                         | |
  60 *       |     /   ---------------          | |
  61 *       |    /                   \         v v
  62 *       |   /                    --------------
  63 *       |  /    -----------------| CONNECTING |  socket created, TCP
  64 *       |  |   /                 --------------  connect initiated
  65 *       |  |   | con_sock_state_connected()
  66 *       |  |   v
  67 *      -------------
  68 *      | CONNECTED |  TCP connection established
  69 *      -------------
  70 *
  71 * State values for ceph_connection->sock_state; NEW is assumed to be 0.
  72 */
  73
  74#define CON_SOCK_STATE_NEW              0       /* -> CLOSED */
  75#define CON_SOCK_STATE_CLOSED           1       /* -> CONNECTING */
  76#define CON_SOCK_STATE_CONNECTING       2       /* -> CONNECTED or -> CLOSING */
  77#define CON_SOCK_STATE_CONNECTED        3       /* -> CLOSING or -> CLOSED */
  78#define CON_SOCK_STATE_CLOSING          4       /* -> CLOSED */
  79
  80/*
  81 * connection states
  82 */
  83#define CON_STATE_CLOSED        1  /* -> PREOPEN */
  84#define CON_STATE_PREOPEN       2  /* -> CONNECTING, CLOSED */
  85#define CON_STATE_CONNECTING    3  /* -> NEGOTIATING, CLOSED */
  86#define CON_STATE_NEGOTIATING   4  /* -> OPEN, CLOSED */
  87#define CON_STATE_OPEN          5  /* -> STANDBY, CLOSED */
  88#define CON_STATE_STANDBY       6  /* -> PREOPEN, CLOSED */
  89
  90/*
  91 * ceph_connection flag bits
  92 */
  93#define CON_FLAG_LOSSYTX           0  /* we can close channel or drop
  94                                       * messages on errors */
  95#define CON_FLAG_KEEPALIVE_PENDING 1  /* we need to send a keepalive */
  96#define CON_FLAG_WRITE_PENDING     2  /* we have data ready to send */
  97#define CON_FLAG_SOCK_CLOSED       3  /* socket state changed to closed */
  98#define CON_FLAG_BACKOFF           4  /* need to retry queuing delayed work */
  99
 100/* static tag bytes (protocol control messages) */
 101static char tag_msg = CEPH_MSGR_TAG_MSG;
 102static char tag_ack = CEPH_MSGR_TAG_ACK;
 103static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
 104
 105#ifdef CONFIG_LOCKDEP
 106static struct lock_class_key socket_class;
 107#endif
 108
 109/*
 110 * When skipping (ignoring) a block of input we read it into a "skip
 111 * buffer," which is this many bytes in size.
 112 */
 113#define SKIP_BUF_SIZE   1024
 114
 115static void queue_con(struct ceph_connection *con);
 116static void con_work(struct work_struct *);
 117static void ceph_fault(struct ceph_connection *con);
 118
 119/*
 120 * Nicely render a sockaddr as a string.  An array of formatted
 121 * strings is used, to approximate reentrancy.
 122 */
 123#define ADDR_STR_COUNT_LOG      5       /* log2(# address strings in array) */
 124#define ADDR_STR_COUNT          (1 << ADDR_STR_COUNT_LOG)
 125#define ADDR_STR_COUNT_MASK     (ADDR_STR_COUNT - 1)
 126#define MAX_ADDR_STR_LEN        64      /* 54 is enough */
 127
 128static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
 129static atomic_t addr_str_seq = ATOMIC_INIT(0);
 130
 131static struct page *zero_page;          /* used in certain error cases */
 132
 133const char *ceph_pr_addr(const struct sockaddr_storage *ss)
 134{
 135        int i;
 136        char *s;
 137        struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
 138        struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
 139
 140        i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
 141        s = addr_str[i];
 142
 143        switch (ss->ss_family) {
 144        case AF_INET:
 145                snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr,
 146                         ntohs(in4->sin_port));
 147                break;
 148
 149        case AF_INET6:
 150                snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr,
 151                         ntohs(in6->sin6_port));
 152                break;
 153
 154        default:
 155                snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
 156                         ss->ss_family);
 157        }
 158
 159        return s;
 160}
 161EXPORT_SYMBOL(ceph_pr_addr);
 162
 163static void encode_my_addr(struct ceph_messenger *msgr)
 164{
 165        memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
 166        ceph_encode_addr(&msgr->my_enc_addr);
 167}
 168
 169/*
 170 * work queue for all reading and writing to/from the socket.
 171 */
 172static struct workqueue_struct *ceph_msgr_wq;
 173
 174void _ceph_msgr_exit(void)
 175{
 176        if (ceph_msgr_wq) {
 177                destroy_workqueue(ceph_msgr_wq);
 178                ceph_msgr_wq = NULL;
 179        }
 180
 181        BUG_ON(zero_page == NULL);
 182        kunmap(zero_page);
 183        page_cache_release(zero_page);
 184        zero_page = NULL;
 185}
 186
 187int ceph_msgr_init(void)
 188{
 189        BUG_ON(zero_page != NULL);
 190        zero_page = ZERO_PAGE(0);
 191        page_cache_get(zero_page);
 192
 193        ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
 194        if (ceph_msgr_wq)
 195                return 0;
 196
 197        pr_err("msgr_init failed to create workqueue\n");
 198        _ceph_msgr_exit();
 199
 200        return -ENOMEM;
 201}
 202EXPORT_SYMBOL(ceph_msgr_init);
 203
 204void ceph_msgr_exit(void)
 205{
 206        BUG_ON(ceph_msgr_wq == NULL);
 207
 208        _ceph_msgr_exit();
 209}
 210EXPORT_SYMBOL(ceph_msgr_exit);
 211
 212void ceph_msgr_flush(void)
 213{
 214        flush_workqueue(ceph_msgr_wq);
 215}
 216EXPORT_SYMBOL(ceph_msgr_flush);
 217
 218/* Connection socket state transition functions */
 219
 220static void con_sock_state_init(struct ceph_connection *con)
 221{
 222        int old_state;
 223
 224        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
 225        if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
 226                printk("%s: unexpected old state %d\n", __func__, old_state);
 227        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 228             CON_SOCK_STATE_CLOSED);
 229}
 230
 231static void con_sock_state_connecting(struct ceph_connection *con)
 232{
 233        int old_state;
 234
 235        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
 236        if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
 237                printk("%s: unexpected old state %d\n", __func__, old_state);
 238        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 239             CON_SOCK_STATE_CONNECTING);
 240}
 241
 242static void con_sock_state_connected(struct ceph_connection *con)
 243{
 244        int old_state;
 245
 246        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
 247        if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
 248                printk("%s: unexpected old state %d\n", __func__, old_state);
 249        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 250             CON_SOCK_STATE_CONNECTED);
 251}
 252
 253static void con_sock_state_closing(struct ceph_connection *con)
 254{
 255        int old_state;
 256
 257        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
 258        if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
 259                        old_state != CON_SOCK_STATE_CONNECTED &&
 260                        old_state != CON_SOCK_STATE_CLOSING))
 261                printk("%s: unexpected old state %d\n", __func__, old_state);
 262        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 263             CON_SOCK_STATE_CLOSING);
 264}
 265
 266static void con_sock_state_closed(struct ceph_connection *con)
 267{
 268        int old_state;
 269
 270        old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
 271        if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
 272                    old_state != CON_SOCK_STATE_CLOSING &&
 273                    old_state != CON_SOCK_STATE_CONNECTING &&
 274                    old_state != CON_SOCK_STATE_CLOSED))
 275                printk("%s: unexpected old state %d\n", __func__, old_state);
 276        dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
 277             CON_SOCK_STATE_CLOSED);
 278}
 279
 280/*
 281 * socket callback functions
 282 */
 283
 284/* data available on socket, or listen socket received a connect */
 285static void ceph_sock_data_ready(struct sock *sk, int count_unused)
 286{
 287        struct ceph_connection *con = sk->sk_user_data;
 288        if (atomic_read(&con->msgr->stopping)) {
 289                return;
 290        }
 291
 292        if (sk->sk_state != TCP_CLOSE_WAIT) {
 293                dout("%s on %p state = %lu, queueing work\n", __func__,
 294                     con, con->state);
 295                queue_con(con);
 296        }
 297}
 298
 299/* socket has buffer space for writing */
 300static void ceph_sock_write_space(struct sock *sk)
 301{
 302        struct ceph_connection *con = sk->sk_user_data;
 303
 304        /* only queue to workqueue if there is data we want to write,
 305         * and there is sufficient space in the socket buffer to accept
 306         * more data.  clear SOCK_NOSPACE so that ceph_sock_write_space()
 307         * doesn't get called again until try_write() fills the socket
 308         * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
 309         * and net/core/stream.c:sk_stream_write_space().
 310         */
 311        if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
 312                if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
 313                        dout("%s %p queueing write work\n", __func__, con);
 314                        clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 315                        queue_con(con);
 316                }
 317        } else {
 318                dout("%s %p nothing to write\n", __func__, con);
 319        }
 320}
 321
 322/* socket's state has changed */
 323static void ceph_sock_state_change(struct sock *sk)
 324{
 325        struct ceph_connection *con = sk->sk_user_data;
 326
 327        dout("%s %p state = %lu sk_state = %u\n", __func__,
 328             con, con->state, sk->sk_state);
 329
 330        switch (sk->sk_state) {
 331        case TCP_CLOSE:
 332                dout("%s TCP_CLOSE\n", __func__);
 333        case TCP_CLOSE_WAIT:
 334                dout("%s TCP_CLOSE_WAIT\n", __func__);
 335                con_sock_state_closing(con);
 336                set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
 337                queue_con(con);
 338                break;
 339        case TCP_ESTABLISHED:
 340                dout("%s TCP_ESTABLISHED\n", __func__);
 341                con_sock_state_connected(con);
 342                queue_con(con);
 343                break;
 344        default:        /* Everything else is uninteresting */
 345                break;
 346        }
 347}
 348
 349/*
 350 * set up socket callbacks
 351 */
 352static void set_sock_callbacks(struct socket *sock,
 353                               struct ceph_connection *con)
 354{
 355        struct sock *sk = sock->sk;
 356        sk->sk_user_data = con;
 357        sk->sk_data_ready = ceph_sock_data_ready;
 358        sk->sk_write_space = ceph_sock_write_space;
 359        sk->sk_state_change = ceph_sock_state_change;
 360}
 361
 362
 363/*
 364 * socket helpers
 365 */
 366
 367/*
 368 * initiate connection to a remote socket.
 369 */
 370static int ceph_tcp_connect(struct ceph_connection *con)
 371{
 372        struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
 373        struct socket *sock;
 374        int ret;
 375
 376        BUG_ON(con->sock);
 377        ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
 378                               IPPROTO_TCP, &sock);
 379        if (ret)
 380                return ret;
 381        sock->sk->sk_allocation = GFP_NOFS;
 382
 383#ifdef CONFIG_LOCKDEP
 384        lockdep_set_class(&sock->sk->sk_lock, &socket_class);
 385#endif
 386
 387        set_sock_callbacks(sock, con);
 388
 389        dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
 390
 391        con_sock_state_connecting(con);
 392        ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
 393                                 O_NONBLOCK);
 394        if (ret == -EINPROGRESS) {
 395                dout("connect %s EINPROGRESS sk_state = %u\n",
 396                     ceph_pr_addr(&con->peer_addr.in_addr),
 397                     sock->sk->sk_state);
 398        } else if (ret < 0) {
 399                pr_err("connect %s error %d\n",
 400                       ceph_pr_addr(&con->peer_addr.in_addr), ret);
 401                sock_release(sock);
 402                con->error_msg = "connect error";
 403
 404                return ret;
 405        }
 406        con->sock = sock;
 407        return 0;
 408}
 409
 410static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
 411{
 412        struct kvec iov = {buf, len};
 413        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
 414        int r;
 415
 416        r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
 417        if (r == -EAGAIN)
 418                r = 0;
 419        return r;
 420}
 421
 422/*
 423 * write something.  @more is true if caller will be sending more data
 424 * shortly.
 425 */
 426static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
 427                     size_t kvlen, size_t len, int more)
 428{
 429        struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
 430        int r;
 431
 432        if (more)
 433                msg.msg_flags |= MSG_MORE;
 434        else
 435                msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
 436
 437        r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
 438        if (r == -EAGAIN)
 439                r = 0;
 440        return r;
 441}
 442
 443static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
 444                     int offset, size_t size, int more)
 445{
 446        int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
 447        int ret;
 448
 449        ret = kernel_sendpage(sock, page, offset, size, flags);
 450        if (ret == -EAGAIN)
 451                ret = 0;
 452
 453        return ret;
 454}
 455
 456
 457/*
 458 * Shutdown/close the socket for the given connection.
 459 */
 460static int con_close_socket(struct ceph_connection *con)
 461{
 462        int rc = 0;
 463
 464        dout("con_close_socket on %p sock %p\n", con, con->sock);
 465        if (con->sock) {
 466                rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
 467                sock_release(con->sock);
 468                con->sock = NULL;
 469        }
 470
 471        /*
 472         * Forcibly clear the SOCK_CLOSED flag.  It gets set
 473         * independent of the connection mutex, and we could have
 474         * received a socket close event before we had the chance to
 475         * shut the socket down.
 476         */
 477        clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
 478
 479        con_sock_state_closed(con);
 480        return rc;
 481}
 482
 483/*
 484 * Reset a connection.  Discard all incoming and outgoing messages
 485 * and clear *_seq state.
 486 */
 487static void ceph_msg_remove(struct ceph_msg *msg)
 488{
 489        list_del_init(&msg->list_head);
 490        BUG_ON(msg->con == NULL);
 491        msg->con->ops->put(msg->con);
 492        msg->con = NULL;
 493
 494        ceph_msg_put(msg);
 495}
 496static void ceph_msg_remove_list(struct list_head *head)
 497{
 498        while (!list_empty(head)) {
 499                struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
 500                                                        list_head);
 501                ceph_msg_remove(msg);
 502        }
 503}
 504
 505static void reset_connection(struct ceph_connection *con)
 506{
 507        /* reset connection, out_queue, msg_ and connect_seq */
 508        /* discard existing out_queue and msg_seq */
 509        dout("reset_connection %p\n", con);
 510        ceph_msg_remove_list(&con->out_queue);
 511        ceph_msg_remove_list(&con->out_sent);
 512
 513        if (con->in_msg) {
 514                BUG_ON(con->in_msg->con != con);
 515                con->in_msg->con = NULL;
 516                ceph_msg_put(con->in_msg);
 517                con->in_msg = NULL;
 518                con->ops->put(con);
 519        }
 520
 521        con->connect_seq = 0;
 522        con->out_seq = 0;
 523        if (con->out_msg) {
 524                ceph_msg_put(con->out_msg);
 525                con->out_msg = NULL;
 526        }
 527        con->in_seq = 0;
 528        con->in_seq_acked = 0;
 529}
 530
 531/*
 532 * mark a peer down.  drop any open connections.
 533 */
 534void ceph_con_close(struct ceph_connection *con)
 535{
 536        mutex_lock(&con->mutex);
 537        dout("con_close %p peer %s\n", con,
 538             ceph_pr_addr(&con->peer_addr.in_addr));
 539        con->state = CON_STATE_CLOSED;
 540
 541        clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
 542        clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
 543        clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 544        clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
 545        clear_bit(CON_FLAG_BACKOFF, &con->flags);
 546
 547        reset_connection(con);
 548        con->peer_global_seq = 0;
 549        cancel_delayed_work(&con->work);
 550        con_close_socket(con);
 551        mutex_unlock(&con->mutex);
 552}
 553EXPORT_SYMBOL(ceph_con_close);
 554
 555/*
 556 * Reopen a closed connection, with a new peer address.
 557 */
 558void ceph_con_open(struct ceph_connection *con,
 559                   __u8 entity_type, __u64 entity_num,
 560                   struct ceph_entity_addr *addr)
 561{
 562        mutex_lock(&con->mutex);
 563        dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
 564
 565        WARN_ON(con->state != CON_STATE_CLOSED);
 566        con->state = CON_STATE_PREOPEN;
 567
 568        con->peer_name.type = (__u8) entity_type;
 569        con->peer_name.num = cpu_to_le64(entity_num);
 570
 571        memcpy(&con->peer_addr, addr, sizeof(*addr));
 572        con->delay = 0;      /* reset backoff memory */
 573        mutex_unlock(&con->mutex);
 574        queue_con(con);
 575}
 576EXPORT_SYMBOL(ceph_con_open);
 577
 578/*
 579 * return true if this connection ever successfully opened
 580 */
 581bool ceph_con_opened(struct ceph_connection *con)
 582{
 583        return con->connect_seq > 0;
 584}
 585
 586/*
 587 * initialize a new connection.
 588 */
 589void ceph_con_init(struct ceph_connection *con, void *private,
 590        const struct ceph_connection_operations *ops,
 591        struct ceph_messenger *msgr)
 592{
 593        dout("con_init %p\n", con);
 594        memset(con, 0, sizeof(*con));
 595        con->private = private;
 596        con->ops = ops;
 597        con->msgr = msgr;
 598
 599        con_sock_state_init(con);
 600
 601        mutex_init(&con->mutex);
 602        INIT_LIST_HEAD(&con->out_queue);
 603        INIT_LIST_HEAD(&con->out_sent);
 604        INIT_DELAYED_WORK(&con->work, con_work);
 605
 606        con->state = CON_STATE_CLOSED;
 607}
 608EXPORT_SYMBOL(ceph_con_init);
 609
 610
 611/*
 612 * We maintain a global counter to order connection attempts.  Get
 613 * a unique seq greater than @gt.
 614 */
 615static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
 616{
 617        u32 ret;
 618
 619        spin_lock(&msgr->global_seq_lock);
 620        if (msgr->global_seq < gt)
 621                msgr->global_seq = gt;
 622        ret = ++msgr->global_seq;
 623        spin_unlock(&msgr->global_seq_lock);
 624        return ret;
 625}
 626
 627static void con_out_kvec_reset(struct ceph_connection *con)
 628{
 629        con->out_kvec_left = 0;
 630        con->out_kvec_bytes = 0;
 631        con->out_kvec_cur = &con->out_kvec[0];
 632}
 633
 634static void con_out_kvec_add(struct ceph_connection *con,
 635                                size_t size, void *data)
 636{
 637        int index;
 638
 639        index = con->out_kvec_left;
 640        BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
 641
 642        con->out_kvec[index].iov_len = size;
 643        con->out_kvec[index].iov_base = data;
 644        con->out_kvec_left++;
 645        con->out_kvec_bytes += size;
 646}
 647
 648#ifdef CONFIG_BLOCK
 649static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
 650{
 651        if (!bio) {
 652                *iter = NULL;
 653                *seg = 0;
 654                return;
 655        }
 656        *iter = bio;
 657        *seg = bio->bi_idx;
 658}
 659
 660static void iter_bio_next(struct bio **bio_iter, int *seg)
 661{
 662        if (*bio_iter == NULL)
 663                return;
 664
 665        BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
 666
 667        (*seg)++;
 668        if (*seg == (*bio_iter)->bi_vcnt)
 669                init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
 670}
 671#endif
 672
 673static void prepare_write_message_data(struct ceph_connection *con)
 674{
 675        struct ceph_msg *msg = con->out_msg;
 676
 677        BUG_ON(!msg);
 678        BUG_ON(!msg->hdr.data_len);
 679
 680        /* initialize page iterator */
 681        con->out_msg_pos.page = 0;
 682        if (msg->pages)
 683                con->out_msg_pos.page_pos = msg->page_alignment;
 684        else
 685                con->out_msg_pos.page_pos = 0;
 686#ifdef CONFIG_BLOCK
 687        if (msg->bio)
 688                init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
 689#endif
 690        con->out_msg_pos.data_pos = 0;
 691        con->out_msg_pos.did_page_crc = false;
 692        con->out_more = 1;  /* data + footer will follow */
 693}
 694
 695/*
 696 * Prepare footer for currently outgoing message, and finish things
 697 * off.  Assumes out_kvec* are already valid.. we just add on to the end.
 698 */
 699static void prepare_write_message_footer(struct ceph_connection *con)
 700{
 701        struct ceph_msg *m = con->out_msg;
 702        int v = con->out_kvec_left;
 703
 704        m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 705
 706        dout("prepare_write_message_footer %p\n", con);
 707        con->out_kvec_is_msg = true;
 708        con->out_kvec[v].iov_base = &m->footer;
 709        con->out_kvec[v].iov_len = sizeof(m->footer);
 710        con->out_kvec_bytes += sizeof(m->footer);
 711        con->out_kvec_left++;
 712        con->out_more = m->more_to_follow;
 713        con->out_msg_done = true;
 714}
 715
 716/*
 717 * Prepare headers for the next outgoing message.
 718 */
 719static void prepare_write_message(struct ceph_connection *con)
 720{
 721        struct ceph_msg *m;
 722        u32 crc;
 723
 724        con_out_kvec_reset(con);
 725        con->out_kvec_is_msg = true;
 726        con->out_msg_done = false;
 727
 728        /* Sneak an ack in there first?  If we can get it into the same
 729         * TCP packet that's a good thing. */
 730        if (con->in_seq > con->in_seq_acked) {
 731                con->in_seq_acked = con->in_seq;
 732                con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 733                con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
 734                con_out_kvec_add(con, sizeof (con->out_temp_ack),
 735                        &con->out_temp_ack);
 736        }
 737
 738        BUG_ON(list_empty(&con->out_queue));
 739        m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
 740        con->out_msg = m;
 741        BUG_ON(m->con != con);
 742
 743        /* put message on sent list */
 744        ceph_msg_get(m);
 745        list_move_tail(&m->list_head, &con->out_sent);
 746
 747        /*
 748         * only assign outgoing seq # if we haven't sent this message
 749         * yet.  if it is requeued, resend with it's original seq.
 750         */
 751        if (m->needs_out_seq) {
 752                m->hdr.seq = cpu_to_le64(++con->out_seq);
 753                m->needs_out_seq = false;
 754        }
 755#ifdef CONFIG_BLOCK
 756        else
 757                m->bio_iter = NULL;
 758#endif
 759
 760        dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
 761             m, con->out_seq, le16_to_cpu(m->hdr.type),
 762             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
 763             le32_to_cpu(m->hdr.data_len),
 764             m->nr_pages);
 765        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 766
 767        /* tag + hdr + front + middle */
 768        con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
 769        con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
 770        con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 771
 772        if (m->middle)
 773                con_out_kvec_add(con, m->middle->vec.iov_len,
 774                        m->middle->vec.iov_base);
 775
 776        /* fill in crc (except data pages), footer */
 777        crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
 778        con->out_msg->hdr.crc = cpu_to_le32(crc);
 779        con->out_msg->footer.flags = 0;
 780
 781        crc = crc32c(0, m->front.iov_base, m->front.iov_len);
 782        con->out_msg->footer.front_crc = cpu_to_le32(crc);
 783        if (m->middle) {
 784                crc = crc32c(0, m->middle->vec.iov_base,
 785                                m->middle->vec.iov_len);
 786                con->out_msg->footer.middle_crc = cpu_to_le32(crc);
 787        } else
 788                con->out_msg->footer.middle_crc = 0;
 789        dout("%s front_crc %u middle_crc %u\n", __func__,
 790             le32_to_cpu(con->out_msg->footer.front_crc),
 791             le32_to_cpu(con->out_msg->footer.middle_crc));
 792
 793        /* is there a data payload? */
 794        con->out_msg->footer.data_crc = 0;
 795        if (m->hdr.data_len)
 796                prepare_write_message_data(con);
 797        else
 798                /* no, queue up footer too and be done */
 799                prepare_write_message_footer(con);
 800
 801        set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 802}
 803
 804/*
 805 * Prepare an ack.
 806 */
 807static void prepare_write_ack(struct ceph_connection *con)
 808{
 809        dout("prepare_write_ack %p %llu -> %llu\n", con,
 810             con->in_seq_acked, con->in_seq);
 811        con->in_seq_acked = con->in_seq;
 812
 813        con_out_kvec_reset(con);
 814
 815        con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 816
 817        con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
 818        con_out_kvec_add(con, sizeof (con->out_temp_ack),
 819                                &con->out_temp_ack);
 820
 821        con->out_more = 1;  /* more will follow.. eventually.. */
 822        set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 823}
 824
 825/*
 826 * Prepare to write keepalive byte.
 827 */
 828static void prepare_write_keepalive(struct ceph_connection *con)
 829{
 830        dout("prepare_write_keepalive %p\n", con);
 831        con_out_kvec_reset(con);
 832        con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
 833        set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 834}
 835
 836/*
 837 * Connection negotiation.
 838 */
 839
 840static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con,
 841                                                int *auth_proto)
 842{
 843        struct ceph_auth_handshake *auth;
 844
 845        if (!con->ops->get_authorizer) {
 846                con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
 847                con->out_connect.authorizer_len = 0;
 848                return NULL;
 849        }
 850
 851        /* Can't hold the mutex while getting authorizer */
 852        mutex_unlock(&con->mutex);
 853        auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
 854        mutex_lock(&con->mutex);
 855
 856        if (IS_ERR(auth))
 857                return auth;
 858        if (con->state != CON_STATE_NEGOTIATING)
 859                return ERR_PTR(-EAGAIN);
 860
 861        con->auth_reply_buf = auth->authorizer_reply_buf;
 862        con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
 863        return auth;
 864}
 865
 866/*
 867 * We connected to a peer and are saying hello.
 868 */
 869static void prepare_write_banner(struct ceph_connection *con)
 870{
 871        con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
 872        con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
 873                                        &con->msgr->my_enc_addr);
 874
 875        con->out_more = 0;
 876        set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 877}
 878
 879static int prepare_write_connect(struct ceph_connection *con)
 880{
 881        unsigned int global_seq = get_global_seq(con->msgr, 0);
 882        int proto;
 883        int auth_proto;
 884        struct ceph_auth_handshake *auth;
 885
 886        switch (con->peer_name.type) {
 887        case CEPH_ENTITY_TYPE_MON:
 888                proto = CEPH_MONC_PROTOCOL;
 889                break;
 890        case CEPH_ENTITY_TYPE_OSD:
 891                proto = CEPH_OSDC_PROTOCOL;
 892                break;
 893        case CEPH_ENTITY_TYPE_MDS:
 894                proto = CEPH_MDSC_PROTOCOL;
 895                break;
 896        default:
 897                BUG();
 898        }
 899
 900        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
 901             con->connect_seq, global_seq, proto);
 902
 903        con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
 904        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
 905        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
 906        con->out_connect.global_seq = cpu_to_le32(global_seq);
 907        con->out_connect.protocol_version = cpu_to_le32(proto);
 908        con->out_connect.flags = 0;
 909
 910        auth_proto = CEPH_AUTH_UNKNOWN;
 911        auth = get_connect_authorizer(con, &auth_proto);
 912        if (IS_ERR(auth))
 913                return PTR_ERR(auth);
 914
 915        con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
 916        con->out_connect.authorizer_len = auth ?
 917                cpu_to_le32(auth->authorizer_buf_len) : 0;
 918
 919        con_out_kvec_add(con, sizeof (con->out_connect),
 920                                        &con->out_connect);
 921        if (auth && auth->authorizer_buf_len)
 922                con_out_kvec_add(con, auth->authorizer_buf_len,
 923                                        auth->authorizer_buf);
 924
 925        con->out_more = 0;
 926        set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 927
 928        return 0;
 929}
 930
 931/*
 932 * write as much of pending kvecs to the socket as we can.
 933 *  1 -> done
 934 *  0 -> socket full, but more to do
 935 * <0 -> error
 936 */
 937static int write_partial_kvec(struct ceph_connection *con)
 938{
 939        int ret;
 940
 941        dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
 942        while (con->out_kvec_bytes > 0) {
 943                ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
 944                                       con->out_kvec_left, con->out_kvec_bytes,
 945                                       con->out_more);
 946                if (ret <= 0)
 947                        goto out;
 948                con->out_kvec_bytes -= ret;
 949                if (con->out_kvec_bytes == 0)
 950                        break;            /* done */
 951
 952                /* account for full iov entries consumed */
 953                while (ret >= con->out_kvec_cur->iov_len) {
 954                        BUG_ON(!con->out_kvec_left);
 955                        ret -= con->out_kvec_cur->iov_len;
 956                        con->out_kvec_cur++;
 957                        con->out_kvec_left--;
 958                }
 959                /* and for a partially-consumed entry */
 960                if (ret) {
 961                        con->out_kvec_cur->iov_len -= ret;
 962                        con->out_kvec_cur->iov_base += ret;
 963                }
 964        }
 965        con->out_kvec_left = 0;
 966        con->out_kvec_is_msg = false;
 967        ret = 1;
 968out:
 969        dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
 970             con->out_kvec_bytes, con->out_kvec_left, ret);
 971        return ret;  /* done! */
 972}
 973
 974static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
 975                        size_t len, size_t sent, bool in_trail)
 976{
 977        struct ceph_msg *msg = con->out_msg;
 978
 979        BUG_ON(!msg);
 980        BUG_ON(!sent);
 981
 982        con->out_msg_pos.data_pos += sent;
 983        con->out_msg_pos.page_pos += sent;
 984        if (sent < len)
 985                return;
 986
 987        BUG_ON(sent != len);
 988        con->out_msg_pos.page_pos = 0;
 989        con->out_msg_pos.page++;
 990        con->out_msg_pos.did_page_crc = false;
 991        if (in_trail)
 992                list_move_tail(&page->lru,
 993                               &msg->trail->head);
 994        else if (msg->pagelist)
 995                list_move_tail(&page->lru,
 996                               &msg->pagelist->head);
 997#ifdef CONFIG_BLOCK
 998        else if (msg->bio)
 999                iter_bio_next(&msg->bio_iter, &msg->bio_seg);
1000#endif
1001}
1002
1003/*
1004 * Write as much message data payload as we can.  If we finish, queue
1005 * up the footer.
1006 *  1 -> done, footer is now queued in out_kvec[].
1007 *  0 -> socket full, but more to do
1008 * <0 -> error
1009 */
1010static int write_partial_msg_pages(struct ceph_connection *con)
1011{
1012        struct ceph_msg *msg = con->out_msg;
1013        unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
1014        size_t len;
1015        bool do_datacrc = !con->msgr->nocrc;
1016        int ret;
1017        int total_max_write;
1018        bool in_trail = false;
1019        const size_t trail_len = (msg->trail ? msg->trail->length : 0);
1020        const size_t trail_off = data_len - trail_len;
1021
1022        dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
1023             con, msg, con->out_msg_pos.page, msg->nr_pages,
1024             con->out_msg_pos.page_pos);
1025
1026        /*
1027         * Iterate through each page that contains data to be
1028         * written, and send as much as possible for each.
1029         *
1030         * If we are calculating the data crc (the default), we will
1031         * need to map the page.  If we have no pages, they have
1032         * been revoked, so use the zero page.
1033         */
1034        while (data_len > con->out_msg_pos.data_pos) {
1035                struct page *page = NULL;
1036                int max_write = PAGE_SIZE;
1037                int bio_offset = 0;
1038
1039                in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
1040                if (!in_trail)
1041                        total_max_write = trail_off - con->out_msg_pos.data_pos;
1042
1043                if (in_trail) {
1044                        total_max_write = data_len - con->out_msg_pos.data_pos;
1045
1046                        page = list_first_entry(&msg->trail->head,
1047                                                struct page, lru);
1048                } else if (msg->pages) {
1049                        page = msg->pages[con->out_msg_pos.page];
1050                } else if (msg->pagelist) {
1051                        page = list_first_entry(&msg->pagelist->head,
1052                                                struct page, lru);
1053#ifdef CONFIG_BLOCK
1054                } else if (msg->bio) {
1055                        struct bio_vec *bv;
1056
1057                        bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
1058                        page = bv->bv_page;
1059                        bio_offset = bv->bv_offset;
1060                        max_write = bv->bv_len;
1061#endif
1062                } else {
1063                        page = zero_page;
1064                }
1065                len = min_t(int, max_write - con->out_msg_pos.page_pos,
1066                            total_max_write);
1067
1068                if (do_datacrc && !con->out_msg_pos.did_page_crc) {
1069                        void *base;
1070                        u32 crc = le32_to_cpu(msg->footer.data_crc);
1071                        char *kaddr;
1072
1073                        kaddr = kmap(page);
1074                        BUG_ON(kaddr == NULL);
1075                        base = kaddr + con->out_msg_pos.page_pos + bio_offset;
1076                        crc = crc32c(crc, base, len);
1077                        kunmap(page);
1078                        msg->footer.data_crc = cpu_to_le32(crc);
1079                        con->out_msg_pos.did_page_crc = true;
1080                }
1081                ret = ceph_tcp_sendpage(con->sock, page,
1082                                      con->out_msg_pos.page_pos + bio_offset,
1083                                      len, 1);
1084                if (ret <= 0)
1085                        goto out;
1086
1087                out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
1088        }
1089
1090        dout("write_partial_msg_pages %p msg %p done\n", con, msg);
1091
1092        /* prepare and queue up footer, too */
1093        if (!do_datacrc)
1094                msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
1095        con_out_kvec_reset(con);
1096        prepare_write_message_footer(con);
1097        ret = 1;
1098out:
1099        return ret;
1100}
1101
1102/*
1103 * write some zeros
1104 */
1105static int write_partial_skip(struct ceph_connection *con)
1106{
1107        int ret;
1108
1109        while (con->out_skip > 0) {
1110                size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
1111
1112                ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
1113                if (ret <= 0)
1114                        goto out;
1115                con->out_skip -= ret;
1116        }
1117        ret = 1;
1118out:
1119        return ret;
1120}
1121
1122/*
1123 * Prepare to read connection handshake, or an ack.
1124 */
1125static void prepare_read_banner(struct ceph_connection *con)
1126{
1127        dout("prepare_read_banner %p\n", con);
1128        con->in_base_pos = 0;
1129}
1130
1131static void prepare_read_connect(struct ceph_connection *con)
1132{
1133        dout("prepare_read_connect %p\n", con);
1134        con->in_base_pos = 0;
1135}
1136
1137static void prepare_read_ack(struct ceph_connection *con)
1138{
1139        dout("prepare_read_ack %p\n", con);
1140        con->in_base_pos = 0;
1141}
1142
1143static void prepare_read_tag(struct ceph_connection *con)
1144{
1145        dout("prepare_read_tag %p\n", con);
1146        con->in_base_pos = 0;
1147        con->in_tag = CEPH_MSGR_TAG_READY;
1148}
1149
1150/*
1151 * Prepare to read a message.
1152 */
1153static int prepare_read_message(struct ceph_connection *con)
1154{
1155        dout("prepare_read_message %p\n", con);
1156        BUG_ON(con->in_msg != NULL);
1157        con->in_base_pos = 0;
1158        con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
1159        return 0;
1160}
1161
1162
1163static int read_partial(struct ceph_connection *con,
1164                        int end, int size, void *object)
1165{
1166        while (con->in_base_pos < end) {
1167                int left = end - con->in_base_pos;
1168                int have = size - left;
1169                int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
1170                if (ret <= 0)
1171                        return ret;
1172                con->in_base_pos += ret;
1173        }
1174        return 1;
1175}
1176
1177
1178/*
1179 * Read all or part of the connect-side handshake on a new connection
1180 */
1181static int read_partial_banner(struct ceph_connection *con)
1182{
1183        int size;
1184        int end;
1185        int ret;
1186
1187        dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
1188
1189        /* peer's banner */
1190        size = strlen(CEPH_BANNER);
1191        end = size;
1192        ret = read_partial(con, end, size, con->in_banner);
1193        if (ret <= 0)
1194                goto out;
1195
1196        size = sizeof (con->actual_peer_addr);
1197        end += size;
1198        ret = read_partial(con, end, size, &con->actual_peer_addr);
1199        if (ret <= 0)
1200                goto out;
1201
1202        size = sizeof (con->peer_addr_for_me);
1203        end += size;
1204        ret = read_partial(con, end, size, &con->peer_addr_for_me);
1205        if (ret <= 0)
1206                goto out;
1207
1208out:
1209        return ret;
1210}
1211
1212static int read_partial_connect(struct ceph_connection *con)
1213{
1214        int size;
1215        int end;
1216        int ret;
1217
1218        dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
1219
1220        size = sizeof (con->in_reply);
1221        end = size;
1222        ret = read_partial(con, end, size, &con->in_reply);
1223        if (ret <= 0)
1224                goto out;
1225
1226        size = le32_to_cpu(con->in_reply.authorizer_len);
1227        end += size;
1228        ret = read_partial(con, end, size, con->auth_reply_buf);
1229        if (ret <= 0)
1230                goto out;
1231
1232        dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
1233             con, (int)con->in_reply.tag,
1234             le32_to_cpu(con->in_reply.connect_seq),
1235             le32_to_cpu(con->in_reply.global_seq));
1236out:
1237        return ret;
1238
1239}
1240
1241/*
1242 * Verify the hello banner looks okay.
1243 */
1244static int verify_hello(struct ceph_connection *con)
1245{
1246        if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1247                pr_err("connect to %s got bad banner\n",
1248                       ceph_pr_addr(&con->peer_addr.in_addr));
1249                con->error_msg = "protocol error, bad banner";
1250                return -1;
1251        }
1252        return 0;
1253}
1254
1255static bool addr_is_blank(struct sockaddr_storage *ss)
1256{
1257        switch (ss->ss_family) {
1258        case AF_INET:
1259                return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
1260        case AF_INET6:
1261                return
1262                     ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
1263                     ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
1264                     ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
1265                     ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
1266        }
1267        return false;
1268}
1269
1270static int addr_port(struct sockaddr_storage *ss)
1271{
1272        switch (ss->ss_family) {
1273        case AF_INET:
1274                return ntohs(((struct sockaddr_in *)ss)->sin_port);
1275        case AF_INET6:
1276                return ntohs(((struct sockaddr_in6 *)ss)->sin6_port);
1277        }
1278        return 0;
1279}
1280
1281static void addr_set_port(struct sockaddr_storage *ss, int p)
1282{
1283        switch (ss->ss_family) {
1284        case AF_INET:
1285                ((struct sockaddr_in *)ss)->sin_port = htons(p);
1286                break;
1287        case AF_INET6:
1288                ((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
1289                break;
1290        }
1291}
1292
1293/*
1294 * Unlike other *_pton function semantics, zero indicates success.
1295 */
1296static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
1297                char delim, const char **ipend)
1298{
1299        struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
1300        struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
1301
1302        memset(ss, 0, sizeof(*ss));
1303
1304        if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
1305                ss->ss_family = AF_INET;
1306                return 0;
1307        }
1308
1309        if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
1310                ss->ss_family = AF_INET6;
1311                return 0;
1312        }
1313
1314        return -EINVAL;
1315}
1316
1317/*
1318 * Extract hostname string and resolve using kernel DNS facility.
1319 */
1320#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
1321static int ceph_dns_resolve_name(const char *name, size_t namelen,
1322                struct sockaddr_storage *ss, char delim, const char **ipend)
1323{
1324        const char *end, *delim_p;
1325        char *colon_p, *ip_addr = NULL;
1326        int ip_len, ret;
1327
1328        /*
1329         * The end of the hostname occurs immediately preceding the delimiter or
1330         * the port marker (':') where the delimiter takes precedence.
1331         */
1332        delim_p = memchr(name, delim, namelen);
1333        colon_p = memchr(name, ':', namelen);
1334
1335        if (delim_p && colon_p)
1336                end = delim_p < colon_p ? delim_p : colon_p;
1337        else if (!delim_p && colon_p)
1338                end = colon_p;
1339        else {
1340                end = delim_p;
1341                if (!end) /* case: hostname:/ */
1342                        end = name + namelen;
1343        }
1344
1345        if (end <= name)
1346                return -EINVAL;
1347
1348        /* do dns_resolve upcall */
1349        ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
1350        if (ip_len > 0)
1351                ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
1352        else
1353                ret = -ESRCH;
1354
1355        kfree(ip_addr);
1356
1357        *ipend = end;
1358
1359        pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
1360                        ret, ret ? "failed" : ceph_pr_addr(ss));
1361
1362        return ret;
1363}
1364#else
1365static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
1366                struct sockaddr_storage *ss, char delim, const char **ipend)
1367{
1368        return -EINVAL;
1369}
1370#endif
1371
1372/*
1373 * Parse a server name (IP or hostname). If a valid IP address is not found
1374 * then try to extract a hostname to resolve using userspace DNS upcall.
1375 */
1376static int ceph_parse_server_name(const char *name, size_t namelen,
1377                        struct sockaddr_storage *ss, char delim, const char **ipend)
1378{
1379        int ret;
1380
1381        ret = ceph_pton(name, namelen, ss, delim, ipend);
1382        if (ret)
1383                ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
1384
1385        return ret;
1386}
1387
1388/*
1389 * Parse an ip[:port] list into an addr array.  Use the default
1390 * monitor port if a port isn't specified.
1391 */
1392int ceph_parse_ips(const char *c, const char *end,
1393                   struct ceph_entity_addr *addr,
1394                   int max_count, int *count)
1395{
1396        int i, ret = -EINVAL;
1397        const char *p = c;
1398
1399        dout("parse_ips on '%.*s'\n", (int)(end-c), c);
1400        for (i = 0; i < max_count; i++) {
1401                const char *ipend;
1402                struct sockaddr_storage *ss = &addr[i].in_addr;
1403                int port;
1404                char delim = ',';
1405
1406                if (*p == '[') {
1407                        delim = ']';
1408                        p++;
1409                }
1410
1411                ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
1412                if (ret)
1413                        goto bad;
1414                ret = -EINVAL;
1415
1416                p = ipend;
1417
1418                if (delim == ']') {
1419                        if (*p != ']') {
1420                                dout("missing matching ']'\n");
1421                                goto bad;
1422                        }
1423                        p++;
1424                }
1425
1426                /* port? */
1427                if (p < end && *p == ':') {
1428                        port = 0;
1429                        p++;
1430                        while (p < end && *p >= '0' && *p <= '9') {
1431                                port = (port * 10) + (*p - '0');
1432                                p++;
1433                        }
1434                        if (port > 65535 || port == 0)
1435                                goto bad;
1436                } else {
1437                        port = CEPH_MON_PORT;
1438                }
1439
1440                addr_set_port(ss, port);
1441
1442                dout("parse_ips got %s\n", ceph_pr_addr(ss));
1443
1444                if (p == end)
1445                        break;
1446                if (*p != ',')
1447                        goto bad;
1448                p++;
1449        }
1450
1451        if (p != end)
1452                goto bad;
1453
1454        if (count)
1455                *count = i + 1;
1456        return 0;
1457
1458bad:
1459        pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
1460        return ret;
1461}
1462EXPORT_SYMBOL(ceph_parse_ips);
1463
1464static int process_banner(struct ceph_connection *con)
1465{
1466        dout("process_banner on %p\n", con);
1467
1468        if (verify_hello(con) < 0)
1469                return -1;
1470
1471        ceph_decode_addr(&con->actual_peer_addr);
1472        ceph_decode_addr(&con->peer_addr_for_me);
1473
1474        /*
1475         * Make sure the other end is who we wanted.  note that the other
1476         * end may not yet know their ip address, so if it's 0.0.0.0, give
1477         * them the benefit of the doubt.
1478         */
1479        if (memcmp(&con->peer_addr, &con->actual_peer_addr,
1480                   sizeof(con->peer_addr)) != 0 &&
1481            !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
1482              con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
1483                pr_warning("wrong peer, want %s/%d, got %s/%d\n",
1484                           ceph_pr_addr(&con->peer_addr.in_addr),
1485                           (int)le32_to_cpu(con->peer_addr.nonce),
1486                           ceph_pr_addr(&con->actual_peer_addr.in_addr),
1487                           (int)le32_to_cpu(con->actual_peer_addr.nonce));
1488                con->error_msg = "wrong peer at address";
1489                return -1;
1490        }
1491
1492        /*
1493         * did we learn our address?
1494         */
1495        if (addr_is_blank(&con->msgr->inst.addr.in_addr)) {
1496                int port = addr_port(&con->msgr->inst.addr.in_addr);
1497
1498                memcpy(&con->msgr->inst.addr.in_addr,
1499                       &con->peer_addr_for_me.in_addr,
1500                       sizeof(con->peer_addr_for_me.in_addr));
1501                addr_set_port(&con->msgr->inst.addr.in_addr, port);
1502                encode_my_addr(con->msgr);
1503                dout("process_banner learned my addr is %s\n",
1504                     ceph_pr_addr(&con->msgr->inst.addr.in_addr));
1505        }
1506
1507        return 0;
1508}
1509
1510static int process_connect(struct ceph_connection *con)
1511{
1512        u64 sup_feat = con->msgr->supported_features;
1513        u64 req_feat = con->msgr->required_features;
1514        u64 server_feat = le64_to_cpu(con->in_reply.features);
1515        int ret;
1516
1517        dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
1518
1519        switch (con->in_reply.tag) {
1520        case CEPH_MSGR_TAG_FEATURES:
1521                pr_err("%s%lld %s feature set mismatch,"
1522                       " my %llx < server's %llx, missing %llx\n",
1523                       ENTITY_NAME(con->peer_name),
1524                       ceph_pr_addr(&con->peer_addr.in_addr),
1525                       sup_feat, server_feat, server_feat & ~sup_feat);
1526                con->error_msg = "missing required protocol features";
1527                reset_connection(con);
1528                return -1;
1529
1530        case CEPH_MSGR_TAG_BADPROTOVER:
1531                pr_err("%s%lld %s protocol version mismatch,"
1532                       " my %d != server's %d\n",
1533                       ENTITY_NAME(con->peer_name),
1534                       ceph_pr_addr(&con->peer_addr.in_addr),
1535                       le32_to_cpu(con->out_connect.protocol_version),
1536                       le32_to_cpu(con->in_reply.protocol_version));
1537                con->error_msg = "protocol version mismatch";
1538                reset_connection(con);
1539                return -1;
1540
1541        case CEPH_MSGR_TAG_BADAUTHORIZER:
1542                con->auth_retry++;
1543                dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
1544                     con->auth_retry);
1545                if (con->auth_retry == 2) {
1546                        con->error_msg = "connect authorization failure";
1547                        return -1;
1548                }
1549                con->auth_retry = 1;
1550                con_out_kvec_reset(con);
1551                ret = prepare_write_connect(con);
1552                if (ret < 0)
1553                        return ret;
1554                prepare_read_connect(con);
1555                break;
1556
1557        case CEPH_MSGR_TAG_RESETSESSION:
1558                /*
1559                 * If we connected with a large connect_seq but the peer
1560                 * has no record of a session with us (no connection, or
1561                 * connect_seq == 0), they will send RESETSESION to indicate
1562                 * that they must have reset their session, and may have
1563                 * dropped messages.
1564                 */
1565                dout("process_connect got RESET peer seq %u\n",
1566                     le32_to_cpu(con->in_reply.connect_seq));
1567                pr_err("%s%lld %s connection reset\n",
1568                       ENTITY_NAME(con->peer_name),
1569                       ceph_pr_addr(&con->peer_addr.in_addr));
1570                reset_connection(con);
1571                con_out_kvec_reset(con);
1572                ret = prepare_write_connect(con);
1573                if (ret < 0)
1574                        return ret;
1575                prepare_read_connect(con);
1576
1577                /* Tell ceph about it. */
1578                mutex_unlock(&con->mutex);
1579                pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
1580                if (con->ops->peer_reset)
1581                        con->ops->peer_reset(con);
1582                mutex_lock(&con->mutex);
1583                if (con->state != CON_STATE_NEGOTIATING)
1584                        return -EAGAIN;
1585                break;
1586
1587        case CEPH_MSGR_TAG_RETRY_SESSION:
1588                /*
1589                 * If we sent a smaller connect_seq than the peer has, try
1590                 * again with a larger value.
1591                 */
1592                dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
1593                     le32_to_cpu(con->out_connect.connect_seq),
1594                     le32_to_cpu(con->in_reply.connect_seq));
1595                con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
1596                con_out_kvec_reset(con);
1597                ret = prepare_write_connect(con);
1598                if (ret < 0)
1599                        return ret;
1600                prepare_read_connect(con);
1601                break;
1602
1603        case CEPH_MSGR_TAG_RETRY_GLOBAL:
1604                /*
1605                 * If we sent a smaller global_seq than the peer has, try
1606                 * again with a larger value.
1607                 */
1608                dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
1609                     con->peer_global_seq,
1610                     le32_to_cpu(con->in_reply.global_seq));
1611                get_global_seq(con->msgr,
1612                               le32_to_cpu(con->in_reply.global_seq));
1613                con_out_kvec_reset(con);
1614                ret = prepare_write_connect(con);
1615                if (ret < 0)
1616                        return ret;
1617                prepare_read_connect(con);
1618                break;
1619
1620        case CEPH_MSGR_TAG_READY:
1621                if (req_feat & ~server_feat) {
1622                        pr_err("%s%lld %s protocol feature mismatch,"
1623                               " my required %llx > server's %llx, need %llx\n",
1624                               ENTITY_NAME(con->peer_name),
1625                               ceph_pr_addr(&con->peer_addr.in_addr),
1626                               req_feat, server_feat, req_feat & ~server_feat);
1627                        con->error_msg = "missing required protocol features";
1628                        reset_connection(con);
1629                        return -1;
1630                }
1631
1632                WARN_ON(con->state != CON_STATE_NEGOTIATING);
1633                con->state = CON_STATE_OPEN;
1634
1635                con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1636                con->connect_seq++;
1637                con->peer_features = server_feat;
1638                dout("process_connect got READY gseq %d cseq %d (%d)\n",
1639                     con->peer_global_seq,
1640                     le32_to_cpu(con->in_reply.connect_seq),
1641                     con->connect_seq);
1642                WARN_ON(con->connect_seq !=
1643                        le32_to_cpu(con->in_reply.connect_seq));
1644
1645                if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1646                        set_bit(CON_FLAG_LOSSYTX, &con->flags);
1647
1648                con->delay = 0;      /* reset backoff memory */
1649
1650                prepare_read_tag(con);
1651                break;
1652
1653        case CEPH_MSGR_TAG_WAIT:
1654                /*
1655                 * If there is a connection race (we are opening
1656                 * connections to each other), one of us may just have
1657                 * to WAIT.  This shouldn't happen if we are the
1658                 * client.
1659                 */
1660                pr_err("process_connect got WAIT as client\n");
1661                con->error_msg = "protocol error, got WAIT as client";
1662                return -1;
1663
1664        default:
1665                pr_err("connect protocol error, will retry\n");
1666                con->error_msg = "protocol error, garbage tag during connect";
1667                return -1;
1668        }
1669        return 0;
1670}
1671
1672
1673/*
1674 * read (part of) an ack
1675 */
1676static int read_partial_ack(struct ceph_connection *con)
1677{
1678        int size = sizeof (con->in_temp_ack);
1679        int end = size;
1680
1681        return read_partial(con, end, size, &con->in_temp_ack);
1682}
1683
1684
1685/*
1686 * We can finally discard anything that's been acked.
1687 */
1688static void process_ack(struct ceph_connection *con)
1689{
1690        struct ceph_msg *m;
1691        u64 ack = le64_to_cpu(con->in_temp_ack);
1692        u64 seq;
1693
1694        while (!list_empty(&con->out_sent)) {
1695                m = list_first_entry(&con->out_sent, struct ceph_msg,
1696                                     list_head);
1697                seq = le64_to_cpu(m->hdr.seq);
1698                if (seq > ack)
1699                        break;
1700                dout("got ack for seq %llu type %d at %p\n", seq,
1701                     le16_to_cpu(m->hdr.type), m);
1702                m->ack_stamp = jiffies;
1703                ceph_msg_remove(m);
1704        }
1705        prepare_read_tag(con);
1706}
1707
1708
1709
1710
1711static int read_partial_message_section(struct ceph_connection *con,
1712                                        struct kvec *section,
1713                                        unsigned int sec_len, u32 *crc)
1714{
1715        int ret, left;
1716
1717        BUG_ON(!section);
1718
1719        while (section->iov_len < sec_len) {
1720                BUG_ON(section->iov_base == NULL);
1721                left = sec_len - section->iov_len;
1722                ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
1723                                       section->iov_len, left);
1724                if (ret <= 0)
1725                        return ret;
1726                section->iov_len += ret;
1727        }
1728        if (section->iov_len == sec_len)
1729                *crc = crc32c(0, section->iov_base, section->iov_len);
1730
1731        return 1;
1732}
1733
1734static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
1735
1736static int read_partial_message_pages(struct ceph_connection *con,
1737                                      struct page **pages,
1738                                      unsigned int data_len, bool do_datacrc)
1739{
1740        void *p;
1741        int ret;
1742        int left;
1743
1744        left = min((int)(data_len - con->in_msg_pos.data_pos),
1745                   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1746        /* (page) data */
1747        BUG_ON(pages == NULL);
1748        p = kmap(pages[con->in_msg_pos.page]);
1749        ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1750                               left);
1751        if (ret > 0 && do_datacrc)
1752                con->in_data_crc =
1753                        crc32c(con->in_data_crc,
1754                                  p + con->in_msg_pos.page_pos, ret);
1755        kunmap(pages[con->in_msg_pos.page]);
1756        if (ret <= 0)
1757                return ret;
1758        con->in_msg_pos.data_pos += ret;
1759        con->in_msg_pos.page_pos += ret;
1760        if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1761                con->in_msg_pos.page_pos = 0;
1762                con->in_msg_pos.page++;
1763        }
1764
1765        return ret;
1766}
1767
1768#ifdef CONFIG_BLOCK
1769static int read_partial_message_bio(struct ceph_connection *con,
1770                                    struct bio **bio_iter, int *bio_seg,
1771                                    unsigned int data_len, bool do_datacrc)
1772{
1773        struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1774        void *p;
1775        int ret, left;
1776
1777        left = min((int)(data_len - con->in_msg_pos.data_pos),
1778                   (int)(bv->bv_len - con->in_msg_pos.page_pos));
1779
1780        p = kmap(bv->bv_page) + bv->bv_offset;
1781
1782        ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1783                               left);
1784        if (ret > 0 && do_datacrc)
1785                con->in_data_crc =
1786                        crc32c(con->in_data_crc,
1787                                  p + con->in_msg_pos.page_pos, ret);
1788        kunmap(bv->bv_page);
1789        if (ret <= 0)
1790                return ret;
1791        con->in_msg_pos.data_pos += ret;
1792        con->in_msg_pos.page_pos += ret;
1793        if (con->in_msg_pos.page_pos == bv->bv_len) {
1794                con->in_msg_pos.page_pos = 0;
1795                iter_bio_next(bio_iter, bio_seg);
1796        }
1797
1798        return ret;
1799}
1800#endif
1801
1802/*
1803 * read (part of) a message.
1804 */
1805static int read_partial_message(struct ceph_connection *con)
1806{
1807        struct ceph_msg *m = con->in_msg;
1808        int size;
1809        int end;
1810        int ret;
1811        unsigned int front_len, middle_len, data_len;
1812        bool do_datacrc = !con->msgr->nocrc;
1813        u64 seq;
1814        u32 crc;
1815
1816        dout("read_partial_message con %p msg %p\n", con, m);
1817
1818        /* header */
1819        size = sizeof (con->in_hdr);
1820        end = size;
1821        ret = read_partial(con, end, size, &con->in_hdr);
1822        if (ret <= 0)
1823                return ret;
1824
1825        crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
1826        if (cpu_to_le32(crc) != con->in_hdr.crc) {
1827                pr_err("read_partial_message bad hdr "
1828                       " crc %u != expected %u\n",
1829                       crc, con->in_hdr.crc);
1830                return -EBADMSG;
1831        }
1832
1833        front_len = le32_to_cpu(con->in_hdr.front_len);
1834        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1835                return -EIO;
1836        middle_len = le32_to_cpu(con->in_hdr.middle_len);
1837        if (middle_len > CEPH_MSG_MAX_DATA_LEN)
1838                return -EIO;
1839        data_len = le32_to_cpu(con->in_hdr.data_len);
1840        if (data_len > CEPH_MSG_MAX_DATA_LEN)
1841                return -EIO;
1842
1843        /* verify seq# */
1844        seq = le64_to_cpu(con->in_hdr.seq);
1845        if ((s64)seq - (s64)con->in_seq < 1) {
1846                pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1847                        ENTITY_NAME(con->peer_name),
1848                        ceph_pr_addr(&con->peer_addr.in_addr),
1849                        seq, con->in_seq + 1);
1850                con->in_base_pos = -front_len - middle_len - data_len -
1851                        sizeof(m->footer);
1852                con->in_tag = CEPH_MSGR_TAG_READY;
1853                return 0;
1854        } else if ((s64)seq - (s64)con->in_seq > 1) {
1855                pr_err("read_partial_message bad seq %lld expected %lld\n",
1856                       seq, con->in_seq + 1);
1857                con->error_msg = "bad message sequence # for incoming message";
1858                return -EBADMSG;
1859        }
1860
1861        /* allocate message? */
1862        if (!con->in_msg) {
1863                int skip = 0;
1864
1865                dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1866                     con->in_hdr.front_len, con->in_hdr.data_len);
1867                ret = ceph_con_in_msg_alloc(con, &skip);
1868                if (ret < 0)
1869                        return ret;
1870                if (skip) {
1871                        /* skip this message */
1872                        dout("alloc_msg said skip message\n");
1873                        BUG_ON(con->in_msg);
1874                        con->in_base_pos = -front_len - middle_len - data_len -
1875                                sizeof(m->footer);
1876                        con->in_tag = CEPH_MSGR_TAG_READY;
1877                        con->in_seq++;
1878                        return 0;
1879                }
1880
1881                BUG_ON(!con->in_msg);
1882                BUG_ON(con->in_msg->con != con);
1883                m = con->in_msg;
1884                m->front.iov_len = 0;    /* haven't read it yet */
1885                if (m->middle)
1886                        m->middle->vec.iov_len = 0;
1887
1888                con->in_msg_pos.page = 0;
1889                if (m->pages)
1890                        con->in_msg_pos.page_pos = m->page_alignment;
1891                else
1892                        con->in_msg_pos.page_pos = 0;
1893                con->in_msg_pos.data_pos = 0;
1894
1895#ifdef CONFIG_BLOCK
1896                if (m->bio)
1897                        init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1898#endif
1899        }
1900
1901        /* front */
1902        ret = read_partial_message_section(con, &m->front, front_len,
1903                                           &con->in_front_crc);
1904        if (ret <= 0)
1905                return ret;
1906
1907        /* middle */
1908        if (m->middle) {
1909                ret = read_partial_message_section(con, &m->middle->vec,
1910                                                   middle_len,
1911                                                   &con->in_middle_crc);
1912                if (ret <= 0)
1913                        return ret;
1914        }
1915
1916        /* (page) data */
1917        while (con->in_msg_pos.data_pos < data_len) {
1918                if (m->pages) {
1919                        ret = read_partial_message_pages(con, m->pages,
1920                                                 data_len, do_datacrc);
1921                        if (ret <= 0)
1922                                return ret;
1923#ifdef CONFIG_BLOCK
1924                } else if (m->bio) {
1925                        BUG_ON(!m->bio_iter);
1926                        ret = read_partial_message_bio(con,
1927                                                 &m->bio_iter, &m->bio_seg,
1928                                                 data_len, do_datacrc);
1929                        if (ret <= 0)
1930                                return ret;
1931#endif
1932                } else {
1933                        BUG_ON(1);
1934                }
1935        }
1936
1937        /* footer */
1938        size = sizeof (m->footer);
1939        end += size;
1940        ret = read_partial(con, end, size, &m->footer);
1941        if (ret <= 0)
1942                return ret;
1943
1944        dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1945             m, front_len, m->footer.front_crc, middle_len,
1946             m->footer.middle_crc, data_len, m->footer.data_crc);
1947
1948        /* crc ok? */
1949        if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1950                pr_err("read_partial_message %p front crc %u != exp. %u\n",
1951                       m, con->in_front_crc, m->footer.front_crc);
1952                return -EBADMSG;
1953        }
1954        if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1955                pr_err("read_partial_message %p middle crc %u != exp %u\n",
1956                       m, con->in_middle_crc, m->footer.middle_crc);
1957                return -EBADMSG;
1958        }
1959        if (do_datacrc &&
1960            (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1961            con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1962                pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1963                       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1964                return -EBADMSG;
1965        }
1966
1967        return 1; /* done! */
1968}
1969
1970/*
1971 * Process message.  This happens in the worker thread.  The callback should
1972 * be careful not to do anything that waits on other incoming messages or it
1973 * may deadlock.
1974 */
1975static void process_message(struct ceph_connection *con)
1976{
1977        struct ceph_msg *msg;
1978
1979        BUG_ON(con->in_msg->con != con);
1980        con->in_msg->con = NULL;
1981        msg = con->in_msg;
1982        con->in_msg = NULL;
1983        con->ops->put(con);
1984
1985        /* if first message, set peer_name */
1986        if (con->peer_name.type == 0)
1987                con->peer_name = msg->hdr.src;
1988
1989        con->in_seq++;
1990        mutex_unlock(&con->mutex);
1991
1992        dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1993             msg, le64_to_cpu(msg->hdr.seq),
1994             ENTITY_NAME(msg->hdr.src),
1995             le16_to_cpu(msg->hdr.type),
1996             ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1997             le32_to_cpu(msg->hdr.front_len),
1998             le32_to_cpu(msg->hdr.data_len),
1999             con->in_front_crc, con->in_middle_crc, con->in_data_crc);
2000        con->ops->dispatch(con, msg);
2001
2002        mutex_lock(&con->mutex);
2003}
2004
2005
2006/*
2007 * Write something to the socket.  Called in a worker thread when the
2008 * socket appears to be writeable and we have something ready to send.
2009 */
2010static int try_write(struct ceph_connection *con)
2011{
2012        int ret = 1;
2013
2014        dout("try_write start %p state %lu\n", con, con->state);
2015
2016more:
2017        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
2018
2019        /* open the socket first? */
2020        if (con->state == CON_STATE_PREOPEN) {
2021                BUG_ON(con->sock);
2022                con->state = CON_STATE_CONNECTING;
2023
2024                con_out_kvec_reset(con);
2025                prepare_write_banner(con);
2026                prepare_read_banner(con);
2027
2028                BUG_ON(con->in_msg);
2029                con->in_tag = CEPH_MSGR_TAG_READY;
2030                dout("try_write initiating connect on %p new state %lu\n",
2031                     con, con->state);
2032                ret = ceph_tcp_connect(con);
2033                if (ret < 0) {
2034                        con->error_msg = "connect error";
2035                        goto out;
2036                }
2037        }
2038
2039more_kvec:
2040        /* kvec data queued? */
2041        if (con->out_skip) {
2042                ret = write_partial_skip(con);
2043                if (ret <= 0)
2044                        goto out;
2045        }
2046        if (con->out_kvec_left) {
2047                ret = write_partial_kvec(con);
2048                if (ret <= 0)
2049                        goto out;
2050        }
2051
2052        /* msg pages? */
2053        if (con->out_msg) {
2054                if (con->out_msg_done) {
2055                        ceph_msg_put(con->out_msg);
2056                        con->out_msg = NULL;   /* we're done with this one */
2057                        goto do_next;
2058                }
2059
2060                ret = write_partial_msg_pages(con);
2061                if (ret == 1)
2062                        goto more_kvec;  /* we need to send the footer, too! */
2063                if (ret == 0)
2064                        goto out;
2065                if (ret < 0) {
2066                        dout("try_write write_partial_msg_pages err %d\n",
2067                             ret);
2068                        goto out;
2069                }
2070        }
2071
2072do_next:
2073        if (con->state == CON_STATE_OPEN) {
2074                /* is anything else pending? */
2075                if (!list_empty(&con->out_queue)) {
2076                        prepare_write_message(con);
2077                        goto more;
2078                }
2079                if (con->in_seq > con->in_seq_acked) {
2080                        prepare_write_ack(con);
2081                        goto more;
2082                }
2083                if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
2084                                       &con->flags)) {
2085                        prepare_write_keepalive(con);
2086                        goto more;
2087                }
2088        }
2089
2090        /* Nothing to do! */
2091        clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
2092        dout("try_write nothing else to write.\n");
2093        ret = 0;
2094out:
2095        dout("try_write done on %p ret %d\n", con, ret);
2096        return ret;
2097}
2098
2099
2100
2101/*
2102 * Read what we can from the socket.
2103 */
2104static int try_read(struct ceph_connection *con)
2105{
2106        int ret = -1;
2107
2108more:
2109        dout("try_read start on %p state %lu\n", con, con->state);
2110        if (con->state != CON_STATE_CONNECTING &&
2111            con->state != CON_STATE_NEGOTIATING &&
2112            con->state != CON_STATE_OPEN)
2113                return 0;
2114
2115        BUG_ON(!con->sock);
2116
2117        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
2118             con->in_base_pos);
2119
2120        if (con->state == CON_STATE_CONNECTING) {
2121                dout("try_read connecting\n");
2122                ret = read_partial_banner(con);
2123                if (ret <= 0)
2124                        goto out;
2125                ret = process_banner(con);
2126                if (ret < 0)
2127                        goto out;
2128
2129                con->state = CON_STATE_NEGOTIATING;
2130
2131                /*
2132                 * Received banner is good, exchange connection info.
2133                 * Do not reset out_kvec, as sending our banner raced
2134                 * with receiving peer banner after connect completed.
2135                 */
2136                ret = prepare_write_connect(con);
2137                if (ret < 0)
2138                        goto out;
2139                prepare_read_connect(con);
2140
2141                /* Send connection info before awaiting response */
2142                goto out;
2143        }
2144
2145        if (con->state == CON_STATE_NEGOTIATING) {
2146                dout("try_read negotiating\n");
2147                ret = read_partial_connect(con);
2148                if (ret <= 0)
2149                        goto out;
2150                ret = process_connect(con);
2151                if (ret < 0)
2152                        goto out;
2153                goto more;
2154        }
2155
2156        WARN_ON(con->state != CON_STATE_OPEN);
2157
2158        if (con->in_base_pos < 0) {
2159                /*
2160                 * skipping + discarding content.
2161                 *
2162                 * FIXME: there must be a better way to do this!
2163                 */
2164                static char buf[SKIP_BUF_SIZE];
2165                int skip = min((int) sizeof (buf), -con->in_base_pos);
2166
2167                dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
2168                ret = ceph_tcp_recvmsg(con->sock, buf, skip);
2169                if (ret <= 0)
2170                        goto out;
2171                con->in_base_pos += ret;
2172                if (con->in_base_pos)
2173                        goto more;
2174        }
2175        if (con->in_tag == CEPH_MSGR_TAG_READY) {
2176                /*
2177                 * what's next?
2178                 */
2179                ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
2180                if (ret <= 0)
2181                        goto out;
2182                dout("try_read got tag %d\n", (int)con->in_tag);
2183                switch (con->in_tag) {
2184                case CEPH_MSGR_TAG_MSG:
2185                        prepare_read_message(con);
2186                        break;
2187                case CEPH_MSGR_TAG_ACK:
2188                        prepare_read_ack(con);
2189                        break;
2190                case CEPH_MSGR_TAG_CLOSE:
2191                        con_close_socket(con);
2192                        con->state = CON_STATE_CLOSED;
2193                        goto out;
2194                default:
2195                        goto bad_tag;
2196                }
2197        }
2198        if (con->in_tag == CEPH_MSGR_TAG_MSG) {
2199                ret = read_partial_message(con);
2200                if (ret <= 0) {
2201                        switch (ret) {
2202                        case -EBADMSG:
2203                                con->error_msg = "bad crc";
2204                                ret = -EIO;
2205                                break;
2206                        case -EIO:
2207                                con->error_msg = "io error";
2208                                break;
2209                        }
2210                        goto out;
2211                }
2212                if (con->in_tag == CEPH_MSGR_TAG_READY)
2213                        goto more;
2214                process_message(con);
2215                if (con->state == CON_STATE_OPEN)
2216                        prepare_read_tag(con);
2217                goto more;
2218        }
2219        if (con->in_tag == CEPH_MSGR_TAG_ACK) {
2220                ret = read_partial_ack(con);
2221                if (ret <= 0)
2222                        goto out;
2223                process_ack(con);
2224                goto more;
2225        }
2226
2227out:
2228        dout("try_read done on %p ret %d\n", con, ret);
2229        return ret;
2230
2231bad_tag:
2232        pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
2233        con->error_msg = "protocol error, garbage tag";
2234        ret = -1;
2235        goto out;
2236}
2237
2238
2239/*
2240 * Atomically queue work on a connection.  Bump @con reference to
2241 * avoid races with connection teardown.
2242 */
2243static void queue_con(struct ceph_connection *con)
2244{
2245        if (!con->ops->get(con)) {
2246                dout("queue_con %p ref count 0\n", con);
2247                return;
2248        }
2249
2250        if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
2251                dout("queue_con %p - already queued\n", con);
2252                con->ops->put(con);
2253        } else {
2254                dout("queue_con %p\n", con);
2255        }
2256}
2257
2258static bool con_sock_closed(struct ceph_connection *con)
2259{
2260        if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
2261                return false;
2262
2263#define CASE(x)                                                         \
2264        case CON_STATE_ ## x:                                           \
2265                con->error_msg = "socket closed (con state " #x ")";    \
2266                break;
2267
2268        switch (con->state) {
2269        CASE(CLOSED);
2270        CASE(PREOPEN);
2271        CASE(CONNECTING);
2272        CASE(NEGOTIATING);
2273        CASE(OPEN);
2274        CASE(STANDBY);
2275        default:
2276                pr_warning("%s con %p unrecognized state %lu\n",
2277                        __func__, con, con->state);
2278                con->error_msg = "unrecognized con state";
2279                BUG();
2280                break;
2281        }
2282#undef CASE
2283
2284        return true;
2285}
2286
2287/*
2288 * Do some work on a connection.  Drop a connection ref when we're done.
2289 */
2290static void con_work(struct work_struct *work)
2291{
2292        struct ceph_connection *con = container_of(work, struct ceph_connection,
2293                                                   work.work);
2294        int ret;
2295
2296        mutex_lock(&con->mutex);
2297restart:
2298        if (con_sock_closed(con))
2299                goto fault;
2300
2301        if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
2302                dout("con_work %p backing off\n", con);
2303                if (queue_delayed_work(ceph_msgr_wq, &con->work,
2304                                       round_jiffies_relative(con->delay))) {
2305                        dout("con_work %p backoff %lu\n", con, con->delay);
2306                        mutex_unlock(&con->mutex);
2307                        return;
2308                } else {
2309                        dout("con_work %p FAILED to back off %lu\n", con,
2310                             con->delay);
2311                        set_bit(CON_FLAG_BACKOFF, &con->flags);
2312                }
2313                goto done;
2314        }
2315
2316        if (con->state == CON_STATE_STANDBY) {
2317                dout("con_work %p STANDBY\n", con);
2318                goto done;
2319        }
2320        if (con->state == CON_STATE_CLOSED) {
2321                dout("con_work %p CLOSED\n", con);
2322                BUG_ON(con->sock);
2323                goto done;
2324        }
2325        if (con->state == CON_STATE_PREOPEN) {
2326                dout("con_work OPENING\n");
2327                BUG_ON(con->sock);
2328        }
2329
2330        ret = try_read(con);
2331        if (ret == -EAGAIN)
2332                goto restart;
2333        if (ret < 0) {
2334                con->error_msg = "socket error on read";
2335                goto fault;
2336        }
2337
2338        ret = try_write(con);
2339        if (ret == -EAGAIN)
2340                goto restart;
2341        if (ret < 0) {
2342                con->error_msg = "socket error on write";
2343                goto fault;
2344        }
2345
2346done:
2347        mutex_unlock(&con->mutex);
2348done_unlocked:
2349        con->ops->put(con);
2350        return;
2351
2352fault:
2353        ceph_fault(con);     /* error/fault path */
2354        goto done_unlocked;
2355}
2356
2357
2358/*
2359 * Generic error/fault handler.  A retry mechanism is used with
2360 * exponential backoff
2361 */
2362static void ceph_fault(struct ceph_connection *con)
2363        __releases(con->mutex)
2364{
2365        pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2366               ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2367        dout("fault %p state %lu to peer %s\n",
2368             con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2369
2370        WARN_ON(con->state != CON_STATE_CONNECTING &&
2371               con->state != CON_STATE_NEGOTIATING &&
2372               con->state != CON_STATE_OPEN);
2373
2374        con_close_socket(con);
2375
2376        if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
2377                dout("fault on LOSSYTX channel, marking CLOSED\n");
2378                con->state = CON_STATE_CLOSED;
2379                goto out_unlock;
2380        }
2381
2382        if (con->in_msg) {
2383                BUG_ON(con->in_msg->con != con);
2384                con->in_msg->con = NULL;
2385                ceph_msg_put(con->in_msg);
2386                con->in_msg = NULL;
2387                con->ops->put(con);
2388        }
2389
2390        /* Requeue anything that hasn't been acked */
2391        list_splice_init(&con->out_sent, &con->out_queue);
2392
2393        /* If there are no messages queued or keepalive pending, place
2394         * the connection in a STANDBY state */
2395        if (list_empty(&con->out_queue) &&
2396            !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
2397                dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2398                clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
2399                con->state = CON_STATE_STANDBY;
2400        } else {
2401                /* retry after a delay. */
2402                con->state = CON_STATE_PREOPEN;
2403                if (con->delay == 0)
2404                        con->delay = BASE_DELAY_INTERVAL;
2405                else if (con->delay < MAX_DELAY_INTERVAL)
2406                        con->delay *= 2;
2407                con->ops->get(con);
2408                if (queue_delayed_work(ceph_msgr_wq, &con->work,
2409                                       round_jiffies_relative(con->delay))) {
2410                        dout("fault queued %p delay %lu\n", con, con->delay);
2411                } else {
2412                        con->ops->put(con);
2413                        dout("fault failed to queue %p delay %lu, backoff\n",
2414                             con, con->delay);
2415                        /*
2416                         * In many cases we see a socket state change
2417                         * while con_work is running and end up
2418                         * queuing (non-delayed) work, such that we
2419                         * can't backoff with a delay.  Set a flag so
2420                         * that when con_work restarts we schedule the
2421                         * delay then.
2422                         */
2423                        set_bit(CON_FLAG_BACKOFF, &con->flags);
2424                }
2425        }
2426
2427out_unlock:
2428        mutex_unlock(&con->mutex);
2429        /*
2430         * in case we faulted due to authentication, invalidate our
2431         * current tickets so that we can get new ones.
2432         */
2433        if (con->auth_retry && con->ops->invalidate_authorizer) {
2434                dout("calling invalidate_authorizer()\n");
2435                con->ops->invalidate_authorizer(con);
2436        }
2437
2438        if (con->ops->fault)
2439                con->ops->fault(con);
2440}
2441
2442
2443
2444/*
2445 * initialize a new messenger instance
2446 */
2447void ceph_messenger_init(struct ceph_messenger *msgr,
2448                        struct ceph_entity_addr *myaddr,
2449                        u32 supported_features,
2450                        u32 required_features,
2451                        bool nocrc)
2452{
2453        msgr->supported_features = supported_features;
2454        msgr->required_features = required_features;
2455
2456        spin_lock_init(&msgr->global_seq_lock);
2457
2458        if (myaddr)
2459                msgr->inst.addr = *myaddr;
2460
2461        /* select a random nonce */
2462        msgr->inst.addr.type = 0;
2463        get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
2464        encode_my_addr(msgr);
2465        msgr->nocrc = nocrc;
2466
2467        atomic_set(&msgr->stopping, 0);
2468
2469        dout("%s %p\n", __func__, msgr);
2470}
2471EXPORT_SYMBOL(ceph_messenger_init);
2472
2473static void clear_standby(struct ceph_connection *con)
2474{
2475        /* come back from STANDBY? */
2476        if (con->state == CON_STATE_STANDBY) {
2477                dout("clear_standby %p and ++connect_seq\n", con);
2478                con->state = CON_STATE_PREOPEN;
2479                con->connect_seq++;
2480                WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
2481                WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
2482        }
2483}
2484
2485/*
2486 * Queue up an outgoing message on the given connection.
2487 */
2488void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2489{
2490        /* set src+dst */
2491        msg->hdr.src = con->msgr->inst.name;
2492        BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
2493        msg->needs_out_seq = true;
2494
2495        mutex_lock(&con->mutex);
2496
2497        if (con->state == CON_STATE_CLOSED) {
2498                dout("con_send %p closed, dropping %p\n", con, msg);
2499                ceph_msg_put(msg);
2500                mutex_unlock(&con->mutex);
2501                return;
2502        }
2503
2504        BUG_ON(msg->con != NULL);
2505        msg->con = con->ops->get(con);
2506        BUG_ON(msg->con == NULL);
2507
2508        BUG_ON(!list_empty(&msg->list_head));
2509        list_add_tail(&msg->list_head, &con->out_queue);
2510        dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
2511             ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
2512             ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
2513             le32_to_cpu(msg->hdr.front_len),
2514             le32_to_cpu(msg->hdr.middle_len),
2515             le32_to_cpu(msg->hdr.data_len));
2516
2517        clear_standby(con);
2518        mutex_unlock(&con->mutex);
2519
2520        /* if there wasn't anything waiting to send before, queue
2521         * new work */
2522        if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
2523                queue_con(con);
2524}
2525EXPORT_SYMBOL(ceph_con_send);
2526
2527/*
2528 * Revoke a message that was previously queued for send
2529 */
2530void ceph_msg_revoke(struct ceph_msg *msg)
2531{
2532        struct ceph_connection *con = msg->con;
2533
2534        if (!con)
2535                return;         /* Message not in our possession */
2536
2537        mutex_lock(&con->mutex);
2538        if (!list_empty(&msg->list_head)) {
2539                dout("%s %p msg %p - was on queue\n", __func__, con, msg);
2540                list_del_init(&msg->list_head);
2541                BUG_ON(msg->con == NULL);
2542                msg->con->ops->put(msg->con);
2543                msg->con = NULL;
2544                msg->hdr.seq = 0;
2545
2546                ceph_msg_put(msg);
2547        }
2548        if (con->out_msg == msg) {
2549                dout("%s %p msg %p - was sending\n", __func__, con, msg);
2550                con->out_msg = NULL;
2551                if (con->out_kvec_is_msg) {
2552                        con->out_skip = con->out_kvec_bytes;
2553                        con->out_kvec_is_msg = false;
2554                }
2555                msg->hdr.seq = 0;
2556
2557                ceph_msg_put(msg);
2558        }
2559        mutex_unlock(&con->mutex);
2560}
2561
2562/*
2563 * Revoke a message that we may be reading data into
2564 */
2565void ceph_msg_revoke_incoming(struct ceph_msg *msg)
2566{
2567        struct ceph_connection *con;
2568
2569        BUG_ON(msg == NULL);
2570        if (!msg->con) {
2571                dout("%s msg %p null con\n", __func__, msg);
2572
2573                return;         /* Message not in our possession */
2574        }
2575
2576        con = msg->con;
2577        mutex_lock(&con->mutex);
2578        if (con->in_msg == msg) {
2579                unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
2580                unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
2581                unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
2582
2583                /* skip rest of message */
2584                dout("%s %p msg %p revoked\n", __func__, con, msg);
2585                con->in_base_pos = con->in_base_pos -
2586                                sizeof(struct ceph_msg_header) -
2587                                front_len -
2588                                middle_len -
2589                                data_len -
2590                                sizeof(struct ceph_msg_footer);
2591                ceph_msg_put(con->in_msg);
2592                con->in_msg = NULL;
2593                con->in_tag = CEPH_MSGR_TAG_READY;
2594                con->in_seq++;
2595        } else {
2596                dout("%s %p in_msg %p msg %p no-op\n",
2597                     __func__, con, con->in_msg, msg);
2598        }
2599        mutex_unlock(&con->mutex);
2600}
2601
2602/*
2603 * Queue a keepalive byte to ensure the tcp connection is alive.
2604 */
2605void ceph_con_keepalive(struct ceph_connection *con)
2606{
2607        dout("con_keepalive %p\n", con);
2608        mutex_lock(&con->mutex);
2609        clear_standby(con);
2610        mutex_unlock(&con->mutex);
2611        if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
2612            test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
2613                queue_con(con);
2614}
2615EXPORT_SYMBOL(ceph_con_keepalive);
2616
2617
2618/*
2619 * construct a new message with given type, size
2620 * the new msg has a ref count of 1.
2621 */
2622struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2623                              bool can_fail)
2624{
2625        struct ceph_msg *m;
2626
2627        m = kmalloc(sizeof(*m), flags);
2628        if (m == NULL)
2629                goto out;
2630        kref_init(&m->kref);
2631
2632        m->con = NULL;
2633        INIT_LIST_HEAD(&m->list_head);
2634
2635        m->hdr.tid = 0;
2636        m->hdr.type = cpu_to_le16(type);
2637        m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
2638        m->hdr.version = 0;
2639        m->hdr.front_len = cpu_to_le32(front_len);
2640        m->hdr.middle_len = 0;
2641        m->hdr.data_len = 0;
2642        m->hdr.data_off = 0;
2643        m->hdr.reserved = 0;
2644        m->footer.front_crc = 0;
2645        m->footer.middle_crc = 0;
2646        m->footer.data_crc = 0;
2647        m->footer.flags = 0;
2648        m->front_max = front_len;
2649        m->front_is_vmalloc = false;
2650        m->more_to_follow = false;
2651        m->ack_stamp = 0;
2652        m->pool = NULL;
2653
2654        /* middle */
2655        m->middle = NULL;
2656
2657        /* data */
2658        m->nr_pages = 0;
2659        m->page_alignment = 0;
2660        m->pages = NULL;
2661        m->pagelist = NULL;
2662        m->bio = NULL;
2663        m->bio_iter = NULL;
2664        m->bio_seg = 0;
2665        m->trail = NULL;
2666
2667        /* front */
2668        if (front_len) {
2669                if (front_len > PAGE_CACHE_SIZE) {
2670                        m->front.iov_base = __vmalloc(front_len, flags,
2671                                                      PAGE_KERNEL);
2672                        m->front_is_vmalloc = true;
2673                } else {
2674                        m->front.iov_base = kmalloc(front_len, flags);
2675                }
2676                if (m->front.iov_base == NULL) {
2677                        dout("ceph_msg_new can't allocate %d bytes\n",
2678                             front_len);
2679                        goto out2;
2680                }
2681        } else {
2682                m->front.iov_base = NULL;
2683        }
2684        m->front.iov_len = front_len;
2685
2686        dout("ceph_msg_new %p front %d\n", m, front_len);
2687        return m;
2688
2689out2:
2690        ceph_msg_put(m);
2691out:
2692        if (!can_fail) {
2693                pr_err("msg_new can't create type %d front %d\n", type,
2694                       front_len);
2695                WARN_ON(1);
2696        } else {
2697                dout("msg_new can't create type %d front %d\n", type,
2698                     front_len);
2699        }
2700        return NULL;
2701}
2702EXPORT_SYMBOL(ceph_msg_new);
2703
2704/*
2705 * Allocate "middle" portion of a message, if it is needed and wasn't
2706 * allocated by alloc_msg.  This allows us to read a small fixed-size
2707 * per-type header in the front and then gracefully fail (i.e.,
2708 * propagate the error to the caller based on info in the front) when
2709 * the middle is too large.
2710 */
2711static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2712{
2713        int type = le16_to_cpu(msg->hdr.type);
2714        int middle_len = le32_to_cpu(msg->hdr.middle_len);
2715
2716        dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
2717             ceph_msg_type_name(type), middle_len);
2718        BUG_ON(!middle_len);
2719        BUG_ON(msg->middle);
2720
2721        msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
2722        if (!msg->middle)
2723                return -ENOMEM;
2724        return 0;
2725}
2726
2727/*
2728 * Allocate a message for receiving an incoming message on a
2729 * connection, and save the result in con->in_msg.  Uses the
2730 * connection's private alloc_msg op if available.
2731 *
2732 * Returns 0 on success, or a negative error code.
2733 *
2734 * On success, if we set *skip = 1:
2735 *  - the next message should be skipped and ignored.
2736 *  - con->in_msg == NULL
2737 * or if we set *skip = 0:
2738 *  - con->in_msg is non-null.
2739 * On error (ENOMEM, EAGAIN, ...),
2740 *  - con->in_msg == NULL
2741 */
2742static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
2743{
2744        struct ceph_msg_header *hdr = &con->in_hdr;
2745        int type = le16_to_cpu(hdr->type);
2746        int front_len = le32_to_cpu(hdr->front_len);
2747        int middle_len = le32_to_cpu(hdr->middle_len);
2748        int ret = 0;
2749
2750        BUG_ON(con->in_msg != NULL);
2751
2752        if (con->ops->alloc_msg) {
2753                struct ceph_msg *msg;
2754
2755                mutex_unlock(&con->mutex);
2756                msg = con->ops->alloc_msg(con, hdr, skip);
2757                mutex_lock(&con->mutex);
2758                if (con->state != CON_STATE_OPEN) {
2759                        if (msg)
2760                                ceph_msg_put(msg);
2761                        return -EAGAIN;
2762                }
2763                con->in_msg = msg;
2764                if (con->in_msg) {
2765                        con->in_msg->con = con->ops->get(con);
2766                        BUG_ON(con->in_msg->con == NULL);
2767                }
2768                if (*skip) {
2769                        con->in_msg = NULL;
2770                        return 0;
2771                }
2772                if (!con->in_msg) {
2773                        con->error_msg =
2774                                "error allocating memory for incoming message";
2775                        return -ENOMEM;
2776                }
2777        }
2778        if (!con->in_msg) {
2779                con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
2780                if (!con->in_msg) {
2781                        pr_err("unable to allocate msg type %d len %d\n",
2782                               type, front_len);
2783                        return -ENOMEM;
2784                }
2785                con->in_msg->con = con->ops->get(con);
2786                BUG_ON(con->in_msg->con == NULL);
2787                con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
2788        }
2789        memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2790
2791        if (middle_len && !con->in_msg->middle) {
2792                ret = ceph_alloc_middle(con, con->in_msg);
2793                if (ret < 0) {
2794                        ceph_msg_put(con->in_msg);
2795                        con->in_msg = NULL;
2796                }
2797        }
2798
2799        return ret;
2800}
2801
2802
2803/*
2804 * Free a generically kmalloc'd message.
2805 */
2806void ceph_msg_kfree(struct ceph_msg *m)
2807{
2808        dout("msg_kfree %p\n", m);
2809        if (m->front_is_vmalloc)
2810                vfree(m->front.iov_base);
2811        else
2812                kfree(m->front.iov_base);
2813        kfree(m);
2814}
2815
2816/*
2817 * Drop a msg ref.  Destroy as needed.
2818 */
2819void ceph_msg_last_put(struct kref *kref)
2820{
2821        struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
2822
2823        dout("ceph_msg_put last one on %p\n", m);
2824        WARN_ON(!list_empty(&m->list_head));
2825
2826        /* drop middle, data, if any */
2827        if (m->middle) {
2828                ceph_buffer_put(m->middle);
2829                m->middle = NULL;
2830        }
2831        m->nr_pages = 0;
2832        m->pages = NULL;
2833
2834        if (m->pagelist) {
2835                ceph_pagelist_release(m->pagelist);
2836                kfree(m->pagelist);
2837                m->pagelist = NULL;
2838        }
2839
2840        m->trail = NULL;
2841
2842        if (m->pool)
2843                ceph_msgpool_put(m->pool, m);
2844        else
2845                ceph_msg_kfree(m);
2846}
2847EXPORT_SYMBOL(ceph_msg_last_put);
2848
2849void ceph_msg_dump(struct ceph_msg *msg)
2850{
2851        pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg,
2852                 msg->front_max, msg->nr_pages);
2853        print_hex_dump(KERN_DEBUG, "header: ",
2854                       DUMP_PREFIX_OFFSET, 16, 1,
2855                       &msg->hdr, sizeof(msg->hdr), true);
2856        print_hex_dump(KERN_DEBUG, " front: ",
2857                       DUMP_PREFIX_OFFSET, 16, 1,
2858                       msg->front.iov_base, msg->front.iov_len, true);
2859        if (msg->middle)
2860                print_hex_dump(KERN_DEBUG, "middle: ",
2861                               DUMP_PREFIX_OFFSET, 16, 1,
2862                               msg->middle->vec.iov_base,
2863                               msg->middle->vec.iov_len, true);
2864        print_hex_dump(KERN_DEBUG, "footer: ",
2865                       DUMP_PREFIX_OFFSET, 16, 1,
2866                       &msg->footer, sizeof(msg->footer), true);
2867}
2868EXPORT_SYMBOL(ceph_msg_dump);
2869
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.