linux/net/sunrpc/xprtsock.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/xprtsock.c
   3 *
   4 * Client-side transport implementation for sockets.
   5 *
   6 * TCP callback races fixes (C) 1998 Red Hat
   7 * TCP send fixes (C) 1998 Red Hat
   8 * TCP NFS related read + write fixes
   9 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10 *
  11 * Rewrite of larges part of the code in order to stabilize TCP stuff.
  12 * Fix behaviour when socket buffer is full.
  13 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
  14 *
  15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
  16 *
  17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
  18 *   <gilles.quillard@bull.net>
  19 */
  20
  21#include <linux/types.h>
  22#include <linux/slab.h>
  23#include <linux/module.h>
  24#include <linux/capability.h>
  25#include <linux/pagemap.h>
  26#include <linux/errno.h>
  27#include <linux/socket.h>
  28#include <linux/in.h>
  29#include <linux/net.h>
  30#include <linux/mm.h>
  31#include <linux/udp.h>
  32#include <linux/tcp.h>
  33#include <linux/sunrpc/clnt.h>
  34#include <linux/sunrpc/sched.h>
  35#include <linux/sunrpc/xprtsock.h>
  36#include <linux/file.h>
  37
  38#include <net/sock.h>
  39#include <net/checksum.h>
  40#include <net/udp.h>
  41#include <net/tcp.h>
  42
  43/*
  44 * xprtsock tunables
  45 */
  46unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
  47unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
  48
  49unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
  50unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
  51
  52#define XS_TCP_LINGER_TO        (15U * HZ)
  53static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
  54
  55/*
  56 * We can register our own files under /proc/sys/sunrpc by
  57 * calling register_sysctl_table() again.  The files in that
  58 * directory become the union of all files registered there.
  59 *
  60 * We simply need to make sure that we don't collide with
  61 * someone else's file names!
  62 */
  63
  64#ifdef RPC_DEBUG
  65
  66static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
  67static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
  68static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
  69static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
  70
  71static struct ctl_table_header *sunrpc_table_header;
  72
  73/*
  74 * FIXME: changing the UDP slot table size should also resize the UDP
  75 *        socket buffers for existing UDP transports
  76 */
  77static ctl_table xs_tunables_table[] = {
  78        {
  79                .ctl_name       = CTL_SLOTTABLE_UDP,
  80                .procname       = "udp_slot_table_entries",
  81                .data           = &xprt_udp_slot_table_entries,
  82                .maxlen         = sizeof(unsigned int),
  83                .mode           = 0644,
  84                .proc_handler   = &proc_dointvec_minmax,
  85                .strategy       = &sysctl_intvec,
  86                .extra1         = &min_slot_table_size,
  87                .extra2         = &max_slot_table_size
  88        },
  89        {
  90                .ctl_name       = CTL_SLOTTABLE_TCP,
  91                .procname       = "tcp_slot_table_entries",
  92                .data           = &xprt_tcp_slot_table_entries,
  93                .maxlen         = sizeof(unsigned int),
  94                .mode           = 0644,
  95                .proc_handler   = &proc_dointvec_minmax,
  96                .strategy       = &sysctl_intvec,
  97                .extra1         = &min_slot_table_size,
  98                .extra2         = &max_slot_table_size
  99        },
 100        {
 101                .ctl_name       = CTL_MIN_RESVPORT,
 102                .procname       = "min_resvport",
 103                .data           = &xprt_min_resvport,
 104                .maxlen         = sizeof(unsigned int),
 105                .mode           = 0644,
 106                .proc_handler   = &proc_dointvec_minmax,
 107                .strategy       = &sysctl_intvec,
 108                .extra1         = &xprt_min_resvport_limit,
 109                .extra2         = &xprt_max_resvport_limit
 110        },
 111        {
 112                .ctl_name       = CTL_MAX_RESVPORT,
 113                .procname       = "max_resvport",
 114                .data           = &xprt_max_resvport,
 115                .maxlen         = sizeof(unsigned int),
 116                .mode           = 0644,
 117                .proc_handler   = &proc_dointvec_minmax,
 118                .strategy       = &sysctl_intvec,
 119                .extra1         = &xprt_min_resvport_limit,
 120                .extra2         = &xprt_max_resvport_limit
 121        },
 122        {
 123                .procname       = "tcp_fin_timeout",
 124                .data           = &xs_tcp_fin_timeout,
 125                .maxlen         = sizeof(xs_tcp_fin_timeout),
 126                .mode           = 0644,
 127                .proc_handler   = &proc_dointvec_jiffies,
 128                .strategy       = sysctl_jiffies
 129        },
 130        {
 131                .ctl_name = 0,
 132        },
 133};
 134
 135static ctl_table sunrpc_table[] = {
 136        {
 137                .ctl_name       = CTL_SUNRPC,
 138                .procname       = "sunrpc",
 139                .mode           = 0555,
 140                .child          = xs_tunables_table
 141        },
 142        {
 143                .ctl_name = 0,
 144        },
 145};
 146
 147#endif
 148
 149/*
 150 * Time out for an RPC UDP socket connect.  UDP socket connects are
 151 * synchronous, but we set a timeout anyway in case of resource
 152 * exhaustion on the local host.
 153 */
 154#define XS_UDP_CONN_TO          (5U * HZ)
 155
 156/*
 157 * Wait duration for an RPC TCP connection to be established.  Solaris
 158 * NFS over TCP uses 60 seconds, for example, which is in line with how
 159 * long a server takes to reboot.
 160 */
 161#define XS_TCP_CONN_TO          (60U * HZ)
 162
 163/*
 164 * Wait duration for a reply from the RPC portmapper.
 165 */
 166#define XS_BIND_TO              (60U * HZ)
 167
 168/*
 169 * Delay if a UDP socket connect error occurs.  This is most likely some
 170 * kind of resource problem on the local host.
 171 */
 172#define XS_UDP_REEST_TO         (2U * HZ)
 173
 174/*
 175 * The reestablish timeout allows clients to delay for a bit before attempting
 176 * to reconnect to a server that just dropped our connection.
 177 *
 178 * We implement an exponential backoff when trying to reestablish a TCP
 179 * transport connection with the server.  Some servers like to drop a TCP
 180 * connection when they are overworked, so we start with a short timeout and
 181 * increase over time if the server is down or not responding.
 182 */
 183#define XS_TCP_INIT_REEST_TO    (3U * HZ)
 184#define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
 185
 186/*
 187 * TCP idle timeout; client drops the transport socket if it is idle
 188 * for this long.  Note that we also timeout UDP sockets to prevent
 189 * holding port numbers when there is no RPC traffic.
 190 */
 191#define XS_IDLE_DISC_TO         (5U * 60 * HZ)
 192
 193#ifdef RPC_DEBUG
 194# undef  RPC_DEBUG_DATA
 195# define RPCDBG_FACILITY        RPCDBG_TRANS
 196#endif
 197
 198#ifdef RPC_DEBUG_DATA
 199static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 200{
 201        u8 *buf = (u8 *) packet;
 202        int j;
 203
 204        dprintk("RPC:       %s\n", msg);
 205        for (j = 0; j < count && j < 128; j += 4) {
 206                if (!(j & 31)) {
 207                        if (j)
 208                                dprintk("\n");
 209                        dprintk("0x%04x ", j);
 210                }
 211                dprintk("%02x%02x%02x%02x ",
 212                        buf[j], buf[j+1], buf[j+2], buf[j+3]);
 213        }
 214        dprintk("\n");
 215}
 216#else
 217static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 218{
 219        /* NOP */
 220}
 221#endif
 222
 223struct sock_xprt {
 224        struct rpc_xprt         xprt;
 225
 226        /*
 227         * Network layer
 228         */
 229        struct socket *         sock;
 230        struct sock *           inet;
 231
 232        /*
 233         * State of TCP reply receive
 234         */
 235        __be32                  tcp_fraghdr,
 236                                tcp_xid;
 237
 238        u32                     tcp_offset,
 239                                tcp_reclen;
 240
 241        unsigned long           tcp_copied,
 242                                tcp_flags;
 243
 244        /*
 245         * Connection of transports
 246         */
 247        struct delayed_work     connect_worker;
 248        struct sockaddr_storage addr;
 249        unsigned short          port;
 250
 251        /*
 252         * UDP socket buffer size parameters
 253         */
 254        size_t                  rcvsize,
 255                                sndsize;
 256
 257        /*
 258         * Saved socket callback addresses
 259         */
 260        void                    (*old_data_ready)(struct sock *, int);
 261        void                    (*old_state_change)(struct sock *);
 262        void                    (*old_write_space)(struct sock *);
 263        void                    (*old_error_report)(struct sock *);
 264};
 265
 266/*
 267 * TCP receive state flags
 268 */
 269#define TCP_RCV_LAST_FRAG       (1UL << 0)
 270#define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
 271#define TCP_RCV_COPY_XID        (1UL << 2)
 272#define TCP_RCV_COPY_DATA       (1UL << 3)
 273
 274static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
 275{
 276        return (struct sockaddr *) &xprt->addr;
 277}
 278
 279static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
 280{
 281        return (struct sockaddr_in *) &xprt->addr;
 282}
 283
 284static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
 285{
 286        return (struct sockaddr_in6 *) &xprt->addr;
 287}
 288
 289static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
 290                                          const char *protocol,
 291                                          const char *netid)
 292{
 293        struct sockaddr_in *addr = xs_addr_in(xprt);
 294        char *buf;
 295
 296        buf = kzalloc(20, GFP_KERNEL);
 297        if (buf) {
 298                snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr);
 299        }
 300        xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
 301
 302        buf = kzalloc(8, GFP_KERNEL);
 303        if (buf) {
 304                snprintf(buf, 8, "%u",
 305                                ntohs(addr->sin_port));
 306        }
 307        xprt->address_strings[RPC_DISPLAY_PORT] = buf;
 308
 309        xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
 310
 311        buf = kzalloc(48, GFP_KERNEL);
 312        if (buf) {
 313                snprintf(buf, 48, "addr=%pI4 port=%u proto=%s",
 314                        &addr->sin_addr.s_addr,
 315                        ntohs(addr->sin_port),
 316                        protocol);
 317        }
 318        xprt->address_strings[RPC_DISPLAY_ALL] = buf;
 319
 320        buf = kzalloc(10, GFP_KERNEL);
 321        if (buf) {
 322                snprintf(buf, 10, "%02x%02x%02x%02x",
 323                                NIPQUAD(addr->sin_addr.s_addr));
 324        }
 325        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
 326
 327        buf = kzalloc(8, GFP_KERNEL);
 328        if (buf) {
 329                snprintf(buf, 8, "%4hx",
 330                                ntohs(addr->sin_port));
 331        }
 332        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
 333
 334        buf = kzalloc(30, GFP_KERNEL);
 335        if (buf) {
 336                snprintf(buf, 30, "%pI4.%u.%u",
 337                                &addr->sin_addr.s_addr,
 338                                ntohs(addr->sin_port) >> 8,
 339                                ntohs(addr->sin_port) & 0xff);
 340        }
 341        xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
 342
 343        xprt->address_strings[RPC_DISPLAY_NETID] = netid;
 344}
 345
 346static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
 347                                          const char *protocol,
 348                                          const char *netid)
 349{
 350        struct sockaddr_in6 *addr = xs_addr_in6(xprt);
 351        char *buf;
 352
 353        buf = kzalloc(40, GFP_KERNEL);
 354        if (buf) {
 355                snprintf(buf, 40, "%pI6",&addr->sin6_addr);
 356        }
 357        xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
 358
 359        buf = kzalloc(8, GFP_KERNEL);
 360        if (buf) {
 361                snprintf(buf, 8, "%u",
 362                                ntohs(addr->sin6_port));
 363        }
 364        xprt->address_strings[RPC_DISPLAY_PORT] = buf;
 365
 366        xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
 367
 368        buf = kzalloc(64, GFP_KERNEL);
 369        if (buf) {
 370                snprintf(buf, 64, "addr=%pI6 port=%u proto=%s",
 371                                &addr->sin6_addr,
 372                                ntohs(addr->sin6_port),
 373                                protocol);
 374        }
 375        xprt->address_strings[RPC_DISPLAY_ALL] = buf;
 376
 377        buf = kzalloc(36, GFP_KERNEL);
 378        if (buf)
 379                snprintf(buf, 36, "%pi6", &addr->sin6_addr);
 380
 381        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
 382
 383        buf = kzalloc(8, GFP_KERNEL);
 384        if (buf) {
 385                snprintf(buf, 8, "%4hx",
 386                                ntohs(addr->sin6_port));
 387        }
 388        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
 389
 390        buf = kzalloc(50, GFP_KERNEL);
 391        if (buf) {
 392                snprintf(buf, 50, "%pI6.%u.%u",
 393                         &addr->sin6_addr,
 394                         ntohs(addr->sin6_port) >> 8,
 395                         ntohs(addr->sin6_port) & 0xff);
 396        }
 397        xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
 398
 399        xprt->address_strings[RPC_DISPLAY_NETID] = netid;
 400}
 401
 402static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 403{
 404        unsigned int i;
 405
 406        for (i = 0; i < RPC_DISPLAY_MAX; i++)
 407                switch (i) {
 408                case RPC_DISPLAY_PROTO:
 409                case RPC_DISPLAY_NETID:
 410                        continue;
 411                default:
 412                        kfree(xprt->address_strings[i]);
 413                }
 414}
 415
 416#define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
 417
 418static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
 419{
 420        struct msghdr msg = {
 421                .msg_name       = addr,
 422                .msg_namelen    = addrlen,
 423                .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
 424        };
 425        struct kvec iov = {
 426                .iov_base       = vec->iov_base + base,
 427                .iov_len        = vec->iov_len - base,
 428        };
 429
 430        if (iov.iov_len != 0)
 431                return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
 432        return kernel_sendmsg(sock, &msg, NULL, 0, 0);
 433}
 434
 435static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
 436{
 437        struct page **ppage;
 438        unsigned int remainder;
 439        int err, sent = 0;
 440
 441        remainder = xdr->page_len - base;
 442        base += xdr->page_base;
 443        ppage = xdr->pages + (base >> PAGE_SHIFT);
 444        base &= ~PAGE_MASK;
 445        for(;;) {
 446                unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
 447                int flags = XS_SENDMSG_FLAGS;
 448
 449                remainder -= len;
 450                if (remainder != 0 || more)
 451                        flags |= MSG_MORE;
 452                err = sock->ops->sendpage(sock, *ppage, base, len, flags);
 453                if (remainder == 0 || err != len)
 454                        break;
 455                sent += err;
 456                ppage++;
 457                base = 0;
 458        }
 459        if (sent == 0)
 460                return err;
 461        if (err > 0)
 462                sent += err;
 463        return sent;
 464}
 465
 466/**
 467 * xs_sendpages - write pages directly to a socket
 468 * @sock: socket to send on
 469 * @addr: UDP only -- address of destination
 470 * @addrlen: UDP only -- length of destination address
 471 * @xdr: buffer containing this request
 472 * @base: starting position in the buffer
 473 *
 474 */
 475static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
 476{
 477        unsigned int remainder = xdr->len - base;
 478        int err, sent = 0;
 479
 480        if (unlikely(!sock))
 481                return -ENOTSOCK;
 482
 483        clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
 484        if (base != 0) {
 485                addr = NULL;
 486                addrlen = 0;
 487        }
 488
 489        if (base < xdr->head[0].iov_len || addr != NULL) {
 490                unsigned int len = xdr->head[0].iov_len - base;
 491                remainder -= len;
 492                err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
 493                if (remainder == 0 || err != len)
 494                        goto out;
 495                sent += err;
 496                base = 0;
 497        } else
 498                base -= xdr->head[0].iov_len;
 499
 500        if (base < xdr->page_len) {
 501                unsigned int len = xdr->page_len - base;
 502                remainder -= len;
 503                err = xs_send_pagedata(sock, xdr, base, remainder != 0);
 504                if (remainder == 0 || err != len)
 505                        goto out;
 506                sent += err;
 507                base = 0;
 508        } else
 509                base -= xdr->page_len;
 510
 511        if (base >= xdr->tail[0].iov_len)
 512                return sent;
 513        err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
 514out:
 515        if (sent == 0)
 516                return err;
 517        if (err > 0)
 518                sent += err;
 519        return sent;
 520}
 521
 522static void xs_nospace_callback(struct rpc_task *task)
 523{
 524        struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
 525
 526        transport->inet->sk_write_pending--;
 527        clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 528}
 529
 530/**
 531 * xs_nospace - place task on wait queue if transmit was incomplete
 532 * @task: task to put to sleep
 533 *
 534 */
 535static int xs_nospace(struct rpc_task *task)
 536{
 537        struct rpc_rqst *req = task->tk_rqstp;
 538        struct rpc_xprt *xprt = req->rq_xprt;
 539        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 540        int ret = 0;
 541
 542        dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
 543                        task->tk_pid, req->rq_slen - req->rq_bytes_sent,
 544                        req->rq_slen);
 545
 546        /* Protect against races with write_space */
 547        spin_lock_bh(&xprt->transport_lock);
 548
 549        /* Don't race with disconnect */
 550        if (xprt_connected(xprt)) {
 551                if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
 552                        ret = -EAGAIN;
 553                        /*
 554                         * Notify TCP that we're limited by the application
 555                         * window size
 556                         */
 557                        set_bit(SOCK_NOSPACE, &transport->sock->flags);
 558                        transport->inet->sk_write_pending++;
 559                        /* ...and wait for more buffer space */
 560                        xprt_wait_for_buffer_space(task, xs_nospace_callback);
 561                }
 562        } else {
 563                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 564                ret = -ENOTCONN;
 565        }
 566
 567        spin_unlock_bh(&xprt->transport_lock);
 568        return ret;
 569}
 570
 571/**
 572 * xs_udp_send_request - write an RPC request to a UDP socket
 573 * @task: address of RPC task that manages the state of an RPC request
 574 *
 575 * Return values:
 576 *        0:    The request has been sent
 577 *   EAGAIN:    The socket was blocked, please call again later to
 578 *              complete the request
 579 * ENOTCONN:    Caller needs to invoke connect logic then call again
 580 *    other:    Some other error occured, the request was not sent
 581 */
 582static int xs_udp_send_request(struct rpc_task *task)
 583{
 584        struct rpc_rqst *req = task->tk_rqstp;
 585        struct rpc_xprt *xprt = req->rq_xprt;
 586        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 587        struct xdr_buf *xdr = &req->rq_snd_buf;
 588        int status;
 589
 590        xs_pktdump("packet data:",
 591                                req->rq_svec->iov_base,
 592                                req->rq_svec->iov_len);
 593
 594        if (!xprt_bound(xprt))
 595                return -ENOTCONN;
 596        status = xs_sendpages(transport->sock,
 597                              xs_addr(xprt),
 598                              xprt->addrlen, xdr,
 599                              req->rq_bytes_sent);
 600
 601        dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
 602                        xdr->len - req->rq_bytes_sent, status);
 603
 604        if (status >= 0) {
 605                task->tk_bytes_sent += status;
 606                if (status >= req->rq_slen)
 607                        return 0;
 608                /* Still some bytes left; set up for a retry later. */
 609                status = -EAGAIN;
 610        }
 611        if (!transport->sock)
 612                goto out;
 613
 614        switch (status) {
 615        case -ENOTSOCK:
 616                status = -ENOTCONN;
 617                /* Should we call xs_close() here? */
 618                break;
 619        case -EAGAIN:
 620                status = xs_nospace(task);
 621                break;
 622        default:
 623                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 624                        -status);
 625        case -ENETUNREACH:
 626        case -EPIPE:
 627        case -ECONNREFUSED:
 628                /* When the server has died, an ICMP port unreachable message
 629                 * prompts ECONNREFUSED. */
 630                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 631        }
 632out:
 633        return status;
 634}
 635
 636/**
 637 * xs_tcp_shutdown - gracefully shut down a TCP socket
 638 * @xprt: transport
 639 *
 640 * Initiates a graceful shutdown of the TCP socket by calling the
 641 * equivalent of shutdown(SHUT_WR);
 642 */
 643static void xs_tcp_shutdown(struct rpc_xprt *xprt)
 644{
 645        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 646        struct socket *sock = transport->sock;
 647
 648        if (sock != NULL)
 649                kernel_sock_shutdown(sock, SHUT_WR);
 650}
 651
 652static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
 653{
 654        u32 reclen = buf->len - sizeof(rpc_fraghdr);
 655        rpc_fraghdr *base = buf->head[0].iov_base;
 656        *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
 657}
 658
 659/**
 660 * xs_tcp_send_request - write an RPC request to a TCP socket
 661 * @task: address of RPC task that manages the state of an RPC request
 662 *
 663 * Return values:
 664 *        0:    The request has been sent
 665 *   EAGAIN:    The socket was blocked, please call again later to
 666 *              complete the request
 667 * ENOTCONN:    Caller needs to invoke connect logic then call again
 668 *    other:    Some other error occured, the request was not sent
 669 *
 670 * XXX: In the case of soft timeouts, should we eventually give up
 671 *      if sendmsg is not able to make progress?
 672 */
 673static int xs_tcp_send_request(struct rpc_task *task)
 674{
 675        struct rpc_rqst *req = task->tk_rqstp;
 676        struct rpc_xprt *xprt = req->rq_xprt;
 677        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 678        struct xdr_buf *xdr = &req->rq_snd_buf;
 679        int status;
 680
 681        xs_encode_tcp_record_marker(&req->rq_snd_buf);
 682
 683        xs_pktdump("packet data:",
 684                                req->rq_svec->iov_base,
 685                                req->rq_svec->iov_len);
 686
 687        /* Continue transmitting the packet/record. We must be careful
 688         * to cope with writespace callbacks arriving _after_ we have
 689         * called sendmsg(). */
 690        while (1) {
 691                status = xs_sendpages(transport->sock,
 692                                        NULL, 0, xdr, req->rq_bytes_sent);
 693
 694                dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
 695                                xdr->len - req->rq_bytes_sent, status);
 696
 697                if (unlikely(status < 0))
 698                        break;
 699
 700                /* If we've sent the entire packet, immediately
 701                 * reset the count of bytes sent. */
 702                req->rq_bytes_sent += status;
 703                task->tk_bytes_sent += status;
 704                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 705                        req->rq_bytes_sent = 0;
 706                        return 0;
 707                }
 708
 709                if (status != 0)
 710                        continue;
 711                status = -EAGAIN;
 712                break;
 713        }
 714        if (!transport->sock)
 715                goto out;
 716
 717        switch (status) {
 718        case -ENOTSOCK:
 719                status = -ENOTCONN;
 720                /* Should we call xs_close() here? */
 721                break;
 722        case -EAGAIN:
 723                status = xs_nospace(task);
 724                break;
 725        default:
 726                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 727                        -status);
 728        case -ECONNRESET:
 729        case -EPIPE:
 730                xs_tcp_shutdown(xprt);
 731        case -ECONNREFUSED:
 732        case -ENOTCONN:
 733                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 734        }
 735out:
 736        return status;
 737}
 738
 739/**
 740 * xs_tcp_release_xprt - clean up after a tcp transmission
 741 * @xprt: transport
 742 * @task: rpc task
 743 *
 744 * This cleans up if an error causes us to abort the transmission of a request.
 745 * In this case, the socket may need to be reset in order to avoid confusing
 746 * the server.
 747 */
 748static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 749{
 750        struct rpc_rqst *req;
 751
 752        if (task != xprt->snd_task)
 753                return;
 754        if (task == NULL)
 755                goto out_release;
 756        req = task->tk_rqstp;
 757        if (req->rq_bytes_sent == 0)
 758                goto out_release;
 759        if (req->rq_bytes_sent == req->rq_snd_buf.len)
 760                goto out_release;
 761        set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
 762out_release:
 763        xprt_release_xprt(xprt, task);
 764}
 765
 766static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 767{
 768        transport->old_data_ready = sk->sk_data_ready;
 769        transport->old_state_change = sk->sk_state_change;
 770        transport->old_write_space = sk->sk_write_space;
 771        transport->old_error_report = sk->sk_error_report;
 772}
 773
 774static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 775{
 776        sk->sk_data_ready = transport->old_data_ready;
 777        sk->sk_state_change = transport->old_state_change;
 778        sk->sk_write_space = transport->old_write_space;
 779        sk->sk_error_report = transport->old_error_report;
 780}
 781
 782static void xs_reset_transport(struct sock_xprt *transport)
 783{
 784        struct socket *sock = transport->sock;
 785        struct sock *sk = transport->inet;
 786
 787        if (sk == NULL)
 788                return;
 789
 790        write_lock_bh(&sk->sk_callback_lock);
 791        transport->inet = NULL;
 792        transport->sock = NULL;
 793
 794        sk->sk_user_data = NULL;
 795
 796        xs_restore_old_callbacks(transport, sk);
 797        write_unlock_bh(&sk->sk_callback_lock);
 798
 799        sk->sk_no_check = 0;
 800
 801        sock_release(sock);
 802}
 803
 804/**
 805 * xs_close - close a socket
 806 * @xprt: transport
 807 *
 808 * This is used when all requests are complete; ie, no DRC state remains
 809 * on the server we want to save.
 810 *
 811 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
 812 * xs_reset_transport() zeroing the socket from underneath a writer.
 813 */
 814static void xs_close(struct rpc_xprt *xprt)
 815{
 816        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 817
 818        dprintk("RPC:       xs_close xprt %p\n", xprt);
 819
 820        xs_reset_transport(transport);
 821
 822        smp_mb__before_clear_bit();
 823        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
 824        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 825        clear_bit(XPRT_CLOSING, &xprt->state);
 826        smp_mb__after_clear_bit();
 827        xprt_disconnect_done(xprt);
 828}
 829
 830static void xs_tcp_close(struct rpc_xprt *xprt)
 831{
 832        if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
 833                xs_close(xprt);
 834        else
 835                xs_tcp_shutdown(xprt);
 836}
 837
 838/**
 839 * xs_destroy - prepare to shutdown a transport
 840 * @xprt: doomed transport
 841 *
 842 */
 843static void xs_destroy(struct rpc_xprt *xprt)
 844{
 845        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 846
 847        dprintk("RPC:       xs_destroy xprt %p\n", xprt);
 848
 849        cancel_rearming_delayed_work(&transport->connect_worker);
 850
 851        xs_close(xprt);
 852        xs_free_peer_addresses(xprt);
 853        kfree(xprt->slot);
 854        kfree(xprt);
 855        module_put(THIS_MODULE);
 856}
 857
 858static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
 859{
 860        return (struct rpc_xprt *) sk->sk_user_data;
 861}
 862
 863/**
 864 * xs_udp_data_ready - "data ready" callback for UDP sockets
 865 * @sk: socket with data to read
 866 * @len: how much data to read
 867 *
 868 */
 869static void xs_udp_data_ready(struct sock *sk, int len)
 870{
 871        struct rpc_task *task;
 872        struct rpc_xprt *xprt;
 873        struct rpc_rqst *rovr;
 874        struct sk_buff *skb;
 875        int err, repsize, copied;
 876        u32 _xid;
 877        __be32 *xp;
 878
 879        read_lock(&sk->sk_callback_lock);
 880        dprintk("RPC:       xs_udp_data_ready...\n");
 881        if (!(xprt = xprt_from_sock(sk)))
 882                goto out;
 883
 884        if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
 885                goto out;
 886
 887        if (xprt->shutdown)
 888                goto dropit;
 889
 890        repsize = skb->len - sizeof(struct udphdr);
 891        if (repsize < 4) {
 892                dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
 893                goto dropit;
 894        }
 895
 896        /* Copy the XID from the skb... */
 897        xp = skb_header_pointer(skb, sizeof(struct udphdr),
 898                                sizeof(_xid), &_xid);
 899        if (xp == NULL)
 900                goto dropit;
 901
 902        /* Look up and lock the request corresponding to the given XID */
 903        spin_lock(&xprt->transport_lock);
 904        rovr = xprt_lookup_rqst(xprt, *xp);
 905        if (!rovr)
 906                goto out_unlock;
 907        task = rovr->rq_task;
 908
 909        if ((copied = rovr->rq_private_buf.buflen) > repsize)
 910                copied = repsize;
 911
 912        /* Suck it into the iovec, verify checksum if not done by hw. */
 913        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 914                UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
 915                goto out_unlock;
 916        }
 917
 918        UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
 919
 920        /* Something worked... */
 921        dst_confirm(skb->dst);
 922
 923        xprt_adjust_cwnd(task, copied);
 924        xprt_update_rtt(task);
 925        xprt_complete_rqst(task, copied);
 926
 927 out_unlock:
 928        spin_unlock(&xprt->transport_lock);
 929 dropit:
 930        skb_free_datagram(sk, skb);
 931 out:
 932        read_unlock(&sk->sk_callback_lock);
 933}
 934
 935static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
 936{
 937        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 938        size_t len, used;
 939        char *p;
 940
 941        p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
 942        len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
 943        used = xdr_skb_read_bits(desc, p, len);
 944        transport->tcp_offset += used;
 945        if (used != len)
 946                return;
 947
 948        transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
 949        if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
 950                transport->tcp_flags |= TCP_RCV_LAST_FRAG;
 951        else
 952                transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
 953        transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
 954
 955        transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
 956        transport->tcp_offset = 0;
 957
 958        /* Sanity check of the record length */
 959        if (unlikely(transport->tcp_reclen < 4)) {
 960                dprintk("RPC:       invalid TCP record fragment length\n");
 961                xprt_force_disconnect(xprt);
 962                return;
 963        }
 964        dprintk("RPC:       reading TCP record fragment of length %d\n",
 965                        transport->tcp_reclen);
 966}
 967
 968static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
 969{
 970        if (transport->tcp_offset == transport->tcp_reclen) {
 971                transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
 972                transport->tcp_offset = 0;
 973                if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
 974                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
 975                        transport->tcp_flags |= TCP_RCV_COPY_XID;
 976                        transport->tcp_copied = 0;
 977                }
 978        }
 979}
 980
 981static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
 982{
 983        size_t len, used;
 984        char *p;
 985
 986        len = sizeof(transport->tcp_xid) - transport->tcp_offset;
 987        dprintk("RPC:       reading XID (%Zu bytes)\n", len);
 988        p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
 989        used = xdr_skb_read_bits(desc, p, len);
 990        transport->tcp_offset += used;
 991        if (used != len)
 992                return;
 993        transport->tcp_flags &= ~TCP_RCV_COPY_XID;
 994        transport->tcp_flags |= TCP_RCV_COPY_DATA;
 995        transport->tcp_copied = 4;
 996        dprintk("RPC:       reading reply for XID %08x\n",
 997                        ntohl(transport->tcp_xid));
 998        xs_tcp_check_fraghdr(transport);
 999}
1000
1001static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1002{
1003        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1004        struct rpc_rqst *req;
1005        struct xdr_buf *rcvbuf;
1006        size_t len;
1007        ssize_t r;
1008
1009        /* Find and lock the request corresponding to this xid */
1010        spin_lock(&xprt->transport_lock);
1011        req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1012        if (!req) {
1013                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1014                dprintk("RPC:       XID %08x request not found!\n",
1015                                ntohl(transport->tcp_xid));
1016                spin_unlock(&xprt->transport_lock);
1017                return;
1018        }
1019
1020        rcvbuf = &req->rq_private_buf;
1021        len = desc->count;
1022        if (len > transport->tcp_reclen - transport->tcp_offset) {
1023                struct xdr_skb_reader my_desc;
1024
1025                len = transport->tcp_reclen - transport->tcp_offset;
1026                memcpy(&my_desc, desc, sizeof(my_desc));
1027                my_desc.count = len;
1028                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1029                                          &my_desc, xdr_skb_read_bits);
1030                desc->count -= r;
1031                desc->offset += r;
1032        } else
1033                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1034                                          desc, xdr_skb_read_bits);
1035
1036        if (r > 0) {
1037                transport->tcp_copied += r;
1038                transport->tcp_offset += r;
1039        }
1040        if (r != len) {
1041                /* Error when copying to the receive buffer,
1042                 * usually because we weren't able to allocate
1043                 * additional buffer pages. All we can do now
1044                 * is turn off TCP_RCV_COPY_DATA, so the request
1045                 * will not receive any additional updates,
1046                 * and time out.
1047                 * Any remaining data from this record will
1048                 * be discarded.
1049                 */
1050                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1051                dprintk("RPC:       XID %08x truncated request\n",
1052                                ntohl(transport->tcp_xid));
1053                dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1054                                "tcp_offset = %u, tcp_reclen = %u\n",
1055                                xprt, transport->tcp_copied,
1056                                transport->tcp_offset, transport->tcp_reclen);
1057                goto out;
1058        }
1059
1060        dprintk("RPC:       XID %08x read %Zd bytes\n",
1061                        ntohl(transport->tcp_xid), r);
1062        dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1063                        "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1064                        transport->tcp_offset, transport->tcp_reclen);
1065
1066        if (transport->tcp_copied == req->rq_private_buf.buflen)
1067                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1068        else if (transport->tcp_offset == transport->tcp_reclen) {
1069                if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1070                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1071        }
1072
1073out:
1074        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1075                xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1076        spin_unlock(&xprt->transport_lock);
1077        xs_tcp_check_fraghdr(transport);
1078}
1079
1080static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1081{
1082        size_t len;
1083
1084        len = transport->tcp_reclen - transport->tcp_offset;
1085        if (len > desc->count)
1086                len = desc->count;
1087        desc->count -= len;
1088        desc->offset += len;
1089        transport->tcp_offset += len;
1090        dprintk("RPC:       discarded %Zu bytes\n", len);
1091        xs_tcp_check_fraghdr(transport);
1092}
1093
1094static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1095{
1096        struct rpc_xprt *xprt = rd_desc->arg.data;
1097        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1098        struct xdr_skb_reader desc = {
1099                .skb    = skb,
1100                .offset = offset,
1101                .count  = len,
1102        };
1103
1104        dprintk("RPC:       xs_tcp_data_recv started\n");
1105        do {
1106                /* Read in a new fragment marker if necessary */
1107                /* Can we ever really expect to get completely empty fragments? */
1108                if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1109                        xs_tcp_read_fraghdr(xprt, &desc);
1110                        continue;
1111                }
1112                /* Read in the xid if necessary */
1113                if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1114                        xs_tcp_read_xid(transport, &desc);
1115                        continue;
1116                }
1117                /* Read in the request data */
1118                if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1119                        xs_tcp_read_request(xprt, &desc);
1120                        continue;
1121                }
1122                /* Skip over any trailing bytes on short reads */
1123                xs_tcp_read_discard(transport, &desc);
1124        } while (desc.count);
1125        dprintk("RPC:       xs_tcp_data_recv done\n");
1126        return len - desc.count;
1127}
1128
1129/**
1130 * xs_tcp_data_ready - "data ready" callback for TCP sockets
1131 * @sk: socket with data to read
1132 * @bytes: how much data to read
1133 *
1134 */
1135static void xs_tcp_data_ready(struct sock *sk, int bytes)
1136{
1137        struct rpc_xprt *xprt;
1138        read_descriptor_t rd_desc;
1139        int read;
1140
1141        dprintk("RPC:       xs_tcp_data_ready...\n");
1142
1143        read_lock(&sk->sk_callback_lock);
1144        if (!(xprt = xprt_from_sock(sk)))
1145                goto out;
1146        if (xprt->shutdown)
1147                goto out;
1148
1149        /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1150        rd_desc.arg.data = xprt;
1151        do {
1152                rd_desc.count = 65536;
1153                read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1154        } while (read > 0);
1155out:
1156        read_unlock(&sk->sk_callback_lock);
1157}
1158
1159/*
1160 * Do the equivalent of linger/linger2 handling for dealing with
1161 * broken servers that don't close the socket in a timely
1162 * fashion
1163 */
1164static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1165                unsigned long timeout)
1166{
1167        struct sock_xprt *transport;
1168
1169        if (xprt_test_and_set_connecting(xprt))
1170                return;
1171        set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1172        transport = container_of(xprt, struct sock_xprt, xprt);
1173        queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1174                           timeout);
1175}
1176
1177static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1178{
1179        struct sock_xprt *transport;
1180
1181        transport = container_of(xprt, struct sock_xprt, xprt);
1182
1183        if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1184            !cancel_delayed_work(&transport->connect_worker))
1185                return;
1186        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1187        xprt_clear_connecting(xprt);
1188}
1189
1190static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1191{
1192        smp_mb__before_clear_bit();
1193        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1194        clear_bit(XPRT_CLOSING, &xprt->state);
1195        smp_mb__after_clear_bit();
1196        /* Mark transport as closed and wake up all pending tasks */
1197        xprt_disconnect_done(xprt);
1198}
1199
1200/**
1201 * xs_tcp_state_change - callback to handle TCP socket state changes
1202 * @sk: socket whose state has changed
1203 *
1204 */
1205static void xs_tcp_state_change(struct sock *sk)
1206{
1207        struct rpc_xprt *xprt;
1208
1209        read_lock(&sk->sk_callback_lock);
1210        if (!(xprt = xprt_from_sock(sk)))
1211                goto out;
1212        dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1213        dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
1214                        sk->sk_state, xprt_connected(xprt),
1215                        sock_flag(sk, SOCK_DEAD),
1216                        sock_flag(sk, SOCK_ZAPPED));
1217
1218        switch (sk->sk_state) {
1219        case TCP_ESTABLISHED:
1220                spin_lock_bh(&xprt->transport_lock);
1221                if (!xprt_test_and_set_connected(xprt)) {
1222                        struct sock_xprt *transport = container_of(xprt,
1223                                        struct sock_xprt, xprt);
1224
1225                        /* Reset TCP record info */
1226                        transport->tcp_offset = 0;
1227                        transport->tcp_reclen = 0;
1228                        transport->tcp_copied = 0;
1229                        transport->tcp_flags =
1230                                TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1231
1232                        xprt_wake_pending_tasks(xprt, -EAGAIN);
1233                }
1234                spin_unlock_bh(&xprt->transport_lock);
1235                break;
1236        case TCP_FIN_WAIT1:
1237                /* The client initiated a shutdown of the socket */
1238                xprt->connect_cookie++;
1239                xprt->reestablish_timeout = 0;
1240                set_bit(XPRT_CLOSING, &xprt->state);
1241                smp_mb__before_clear_bit();
1242                clear_bit(XPRT_CONNECTED, &xprt->state);
1243                clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1244                smp_mb__after_clear_bit();
1245                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1246                break;
1247        case TCP_CLOSE_WAIT:
1248                /* The server initiated a shutdown of the socket */
1249                xprt_force_disconnect(xprt);
1250        case TCP_SYN_SENT:
1251                xprt->connect_cookie++;
1252        case TCP_CLOSING:
1253                /*
1254                 * If the server closed down the connection, make sure that
1255                 * we back off before reconnecting
1256                 */
1257                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1258                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1259                break;
1260        case TCP_LAST_ACK:
1261                set_bit(XPRT_CLOSING, &xprt->state);
1262                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1263                smp_mb__before_clear_bit();
1264                clear_bit(XPRT_CONNECTED, &xprt->state);
1265                smp_mb__after_clear_bit();
1266                break;
1267        case TCP_CLOSE:
1268                xs_tcp_cancel_linger_timeout(xprt);
1269                xs_sock_mark_closed(xprt);
1270        }
1271 out:
1272        read_unlock(&sk->sk_callback_lock);
1273}
1274
1275/**
1276 * xs_error_report - callback mainly for catching socket errors
1277 * @sk: socket
1278 */
1279static void xs_error_report(struct sock *sk)
1280{
1281        struct rpc_xprt *xprt;
1282
1283        read_lock(&sk->sk_callback_lock);
1284        if (!(xprt = xprt_from_sock(sk)))
1285                goto out;
1286        dprintk("RPC:       %s client %p...\n"
1287                        "RPC:       error %d\n",
1288                        __func__, xprt, sk->sk_err);
1289        xprt_wake_pending_tasks(xprt, -EAGAIN);
1290out:
1291        read_unlock(&sk->sk_callback_lock);
1292}
1293
1294static void xs_write_space(struct sock *sk)
1295{
1296        struct socket *sock;
1297        struct rpc_xprt *xprt;
1298
1299        if (unlikely(!(sock = sk->sk_socket)))
1300                return;
1301        clear_bit(SOCK_NOSPACE, &sock->flags);
1302
1303        if (unlikely(!(xprt = xprt_from_sock(sk))))
1304                return;
1305        if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1306                return;
1307
1308        xprt_write_space(xprt);
1309}
1310
1311/**
1312 * xs_udp_write_space - callback invoked when socket buffer space
1313 *                             becomes available
1314 * @sk: socket whose state has changed
1315 *
1316 * Called when more output buffer space is available for this socket.
1317 * We try not to wake our writers until they can make "significant"
1318 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1319 * with a bunch of small requests.
1320 */
1321static void xs_udp_write_space(struct sock *sk)
1322{
1323        read_lock(&sk->sk_callback_lock);
1324
1325        /* from net/core/sock.c:sock_def_write_space */
1326        if (sock_writeable(sk))
1327                xs_write_space(sk);
1328
1329        read_unlock(&sk->sk_callback_lock);
1330}
1331
1332/**
1333 * xs_tcp_write_space - callback invoked when socket buffer space
1334 *                             becomes available
1335 * @sk: socket whose state has changed
1336 *
1337 * Called when more output buffer space is available for this socket.
1338 * We try not to wake our writers until they can make "significant"
1339 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1340 * with a bunch of small requests.
1341 */
1342static void xs_tcp_write_space(struct sock *sk)
1343{
1344        read_lock(&sk->sk_callback_lock);
1345
1346        /* from net/core/stream.c:sk_stream_write_space */
1347        if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1348                xs_write_space(sk);
1349
1350        read_unlock(&sk->sk_callback_lock);
1351}
1352
1353static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1354{
1355        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1356        struct sock *sk = transport->inet;
1357
1358        if (transport->rcvsize) {
1359                sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1360                sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1361        }
1362        if (transport->sndsize) {
1363                sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1364                sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1365                sk->sk_write_space(sk);
1366        }
1367}
1368
1369/**
1370 * xs_udp_set_buffer_size - set send and receive limits
1371 * @xprt: generic transport
1372 * @sndsize: requested size of send buffer, in bytes
1373 * @rcvsize: requested size of receive buffer, in bytes
1374 *
1375 * Set socket send and receive buffer size limits.
1376 */
1377static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1378{
1379        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1380
1381        transport->sndsize = 0;
1382        if (sndsize)
1383                transport->sndsize = sndsize + 1024;
1384        transport->rcvsize = 0;
1385        if (rcvsize)
1386                transport->rcvsize = rcvsize + 1024;
1387
1388        xs_udp_do_set_buffer_size(xprt);
1389}
1390
1391/**
1392 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1393 * @task: task that timed out
1394 *
1395 * Adjust the congestion window after a retransmit timeout has occurred.
1396 */
1397static void xs_udp_timer(struct rpc_task *task)
1398{
1399        xprt_adjust_cwnd(task, -ETIMEDOUT);
1400}
1401
1402static unsigned short xs_get_random_port(void)
1403{
1404        unsigned short range = xprt_max_resvport - xprt_min_resvport;
1405        unsigned short rand = (unsigned short) net_random() % range;
1406        return rand + xprt_min_resvport;
1407}
1408
1409/**
1410 * xs_set_port - reset the port number in the remote endpoint address
1411 * @xprt: generic transport
1412 * @port: new port number
1413 *
1414 */
1415static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1416{
1417        struct sockaddr *addr = xs_addr(xprt);
1418
1419        dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1420
1421        switch (addr->sa_family) {
1422        case AF_INET:
1423                ((struct sockaddr_in *)addr)->sin_port = htons(port);
1424                break;
1425        case AF_INET6:
1426                ((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
1427                break;
1428        default:
1429                BUG();
1430        }
1431}
1432
1433static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
1434{
1435        unsigned short port = transport->port;
1436
1437        if (port == 0 && transport->xprt.resvport)
1438                port = xs_get_random_port();
1439        return port;
1440}
1441
1442static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
1443{
1444        if (transport->port != 0)
1445                transport->port = 0;
1446        if (!transport->xprt.resvport)
1447                return 0;
1448        if (port <= xprt_min_resvport || port > xprt_max_resvport)
1449                return xprt_max_resvport;
1450        return --port;
1451}
1452
1453static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1454{
1455        struct sockaddr_in myaddr = {
1456                .sin_family = AF_INET,
1457        };
1458        struct sockaddr_in *sa;
1459        int err, nloop = 0;
1460        unsigned short port = xs_get_srcport(transport, sock);
1461        unsigned short last;
1462
1463        sa = (struct sockaddr_in *)&transport->addr;
1464        myaddr.sin_addr = sa->sin_addr;
1465        do {
1466                myaddr.sin_port = htons(port);
1467                err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1468                                                sizeof(myaddr));
1469                if (port == 0)
1470                        break;
1471                if (err == 0) {
1472                        transport->port = port;
1473                        break;
1474                }
1475                last = port;
1476                port = xs_next_srcport(transport, sock, port);
1477                if (port > last)
1478                        nloop++;
1479        } while (err == -EADDRINUSE && nloop != 2);
1480        dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
1481                        __func__, &myaddr.sin_addr,
1482                        port, err ? "failed" : "ok", err);
1483        return err;
1484}
1485
1486static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1487{
1488        struct sockaddr_in6 myaddr = {
1489                .sin6_family = AF_INET6,
1490        };
1491        struct sockaddr_in6 *sa;
1492        int err, nloop = 0;
1493        unsigned short port = xs_get_srcport(transport, sock);
1494        unsigned short last;
1495
1496        sa = (struct sockaddr_in6 *)&transport->addr;
1497        myaddr.sin6_addr = sa->sin6_addr;
1498        do {
1499                myaddr.sin6_port = htons(port);
1500                err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1501                                                sizeof(myaddr));
1502                if (port == 0)
1503                        break;
1504                if (err == 0) {
1505                        transport->port = port;
1506                        break;
1507                }
1508                last = port;
1509                port = xs_next_srcport(transport, sock, port);
1510                if (port > last)
1511                        nloop++;
1512        } while (err == -EADDRINUSE && nloop != 2);
1513        dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
1514                &myaddr.sin6_addr, port, err ? "failed" : "ok", err);
1515        return err;
1516}
1517
1518#ifdef CONFIG_DEBUG_LOCK_ALLOC
1519static struct lock_class_key xs_key[2];
1520static struct lock_class_key xs_slock_key[2];
1521
1522static inline void xs_reclassify_socket4(struct socket *sock)
1523{
1524        struct sock *sk = sock->sk;
1525
1526        BUG_ON(sock_owned_by_user(sk));
1527        sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1528                &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1529}
1530
1531static inline void xs_reclassify_socket6(struct socket *sock)
1532{
1533        struct sock *sk = sock->sk;
1534
1535        BUG_ON(sock_owned_by_user(sk));
1536        sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1537                &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1538}
1539#else
1540static inline void xs_reclassify_socket4(struct socket *sock)
1541{
1542}
1543
1544static inline void xs_reclassify_socket6(struct socket *sock)
1545{
1546}
1547#endif
1548
1549static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1550{
1551        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1552
1553        if (!transport->inet) {
1554                struct sock *sk = sock->sk;
1555
1556                write_lock_bh(&sk->sk_callback_lock);
1557
1558                xs_save_old_callbacks(transport, sk);
1559
1560                sk->sk_user_data = xprt;
1561                sk->sk_data_ready = xs_udp_data_ready;
1562                sk->sk_write_space = xs_udp_write_space;
1563                sk->sk_error_report = xs_error_report;
1564                sk->sk_no_check = UDP_CSUM_NORCV;
1565                sk->sk_allocation = GFP_ATOMIC;
1566
1567                xprt_set_connected(xprt);
1568
1569                /* Reset to new socket */
1570                transport->sock = sock;
1571                transport->inet = sk;
1572
1573                write_unlock_bh(&sk->sk_callback_lock);
1574        }
1575        xs_udp_do_set_buffer_size(xprt);
1576}
1577
1578/**
1579 * xs_udp_connect_worker4 - set up a UDP socket
1580 * @work: RPC transport to connect
1581 *
1582 * Invoked by a work queue tasklet.
1583 */
1584static void xs_udp_connect_worker4(struct work_struct *work)
1585{
1586        struct sock_xprt *transport =
1587                container_of(work, struct sock_xprt, connect_worker.work);
1588        struct rpc_xprt *xprt = &transport->xprt;
1589        struct socket *sock = transport->sock;
1590        int err, status = -EIO;
1591
1592        if (xprt->shutdown)
1593                goto out;
1594
1595        /* Start by resetting any existing state */
1596        xs_reset_transport(transport);
1597
1598        err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
1599        if (err < 0) {
1600                dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1601                goto out;
1602        }
1603        xs_reclassify_socket4(sock);
1604
1605        if (xs_bind4(transport, sock)) {
1606                sock_release(sock);
1607                goto out;
1608        }
1609
1610        dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1611                        xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1612
1613        xs_udp_finish_connecting(xprt, sock);
1614        status = 0;
1615out:
1616        xprt_clear_connecting(xprt);
1617        xprt_wake_pending_tasks(xprt, status);
1618}
1619
1620/**
1621 * xs_udp_connect_worker6 - set up a UDP socket
1622 * @work: RPC transport to connect
1623 *
1624 * Invoked by a work queue tasklet.
1625 */
1626static void xs_udp_connect_worker6(struct work_struct *work)
1627{
1628        struct sock_xprt *transport =
1629                container_of(work, struct sock_xprt, connect_worker.work);
1630        struct rpc_xprt *xprt = &transport->xprt;
1631        struct socket *sock = transport->sock;
1632        int err, status = -EIO;
1633
1634        if (xprt->shutdown)
1635                goto out;
1636
1637        /* Start by resetting any existing state */
1638        xs_reset_transport(transport);
1639
1640        err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
1641        if (err < 0) {
1642                dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1643                goto out;
1644        }
1645        xs_reclassify_socket6(sock);
1646
1647        if (xs_bind6(transport, sock) < 0) {
1648                sock_release(sock);
1649                goto out;
1650        }
1651
1652        dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1653                        xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1654
1655        xs_udp_finish_connecting(xprt, sock);
1656        status = 0;
1657out:
1658        xprt_clear_connecting(xprt);
1659        xprt_wake_pending_tasks(xprt, status);
1660}
1661
1662/*
1663 * We need to preserve the port number so the reply cache on the server can
1664 * find our cached RPC replies when we get around to reconnecting.
1665 */
1666static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1667{
1668        int result;
1669        struct sockaddr any;
1670
1671        dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1672
1673        /*
1674         * Disconnect the transport socket by doing a connect operation
1675         * with AF_UNSPEC.  This should return immediately...
1676         */
1677        memset(&any, 0, sizeof(any));
1678        any.sa_family = AF_UNSPEC;
1679        result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1680        if (!result)
1681                xs_sock_mark_closed(xprt);
1682        else
1683                dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1684                                result);
1685}
1686
1687static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1688{
1689        unsigned int state = transport->inet->sk_state;
1690
1691        if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
1692                return;
1693        if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
1694                return;
1695        xs_abort_connection(xprt, transport);
1696}
1697
1698static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1699{
1700        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1701
1702        if (!transport->inet) {
1703                struct sock *sk = sock->sk;
1704
1705                write_lock_bh(&sk->sk_callback_lock);
1706
1707                xs_save_old_callbacks(transport, sk);
1708
1709                sk->sk_user_data = xprt;
1710                sk->sk_data_ready = xs_tcp_data_ready;
1711                sk->sk_state_change = xs_tcp_state_change;
1712                sk->sk_write_space = xs_tcp_write_space;
1713                sk->sk_error_report = xs_error_report;
1714                sk->sk_allocation = GFP_ATOMIC;
1715
1716                /* socket options */
1717                sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1718                sock_reset_flag(sk, SOCK_LINGER);
1719                tcp_sk(sk)->linger2 = 0;
1720                tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1721
1722                xprt_clear_connected(xprt);
1723
1724                /* Reset to new socket */
1725                transport->sock = sock;
1726                transport->inet = sk;
1727
1728                write_unlock_bh(&sk->sk_callback_lock);
1729        }
1730
1731        if (!xprt_bound(xprt))
1732                return -ENOTCONN;
1733
1734        /* Tell the socket layer to start connecting... */
1735        xprt->stat.connect_count++;
1736        xprt->stat.connect_start = jiffies;
1737        return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1738}
1739
1740/**
1741 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
1742 * @xprt: RPC transport to connect
1743 * @transport: socket transport to connect
1744 * @create_sock: function to create a socket of the correct type
1745 *
1746 * Invoked by a work queue tasklet.
1747 */
1748static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
1749                struct sock_xprt *transport,
1750                struct socket *(*create_sock)(struct rpc_xprt *,
1751                        struct sock_xprt *))
1752{
1753        struct socket *sock = transport->sock;
1754        int status = -EIO;
1755
1756        if (xprt->shutdown)
1757                goto out;
1758
1759        if (!sock) {
1760                clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1761                sock = create_sock(xprt, transport);
1762                if (IS_ERR(sock)) {
1763                        status = PTR_ERR(sock);
1764                        goto out;
1765                }
1766        } else {
1767                int abort_and_exit;
1768
1769                abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
1770                                &xprt->state);
1771                /* "close" the socket, preserving the local port */
1772                xs_tcp_reuse_connection(xprt, transport);
1773
1774                if (abort_and_exit)
1775                        goto out_eagain;
1776        }
1777
1778        dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1779                        xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1780
1781        status = xs_tcp_finish_connecting(xprt, sock);
1782        dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1783                        xprt, -status, xprt_connected(xprt),
1784                        sock->sk->sk_state);
1785        switch (status) {
1786        default:
1787                printk("%s: connect returned unhandled error %d\n",
1788                        __func__, status);
1789        case -EADDRNOTAVAIL:
1790                /* We're probably in TIME_WAIT. Get rid of existing socket,
1791                 * and retry
1792                 */
1793                set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1794                xprt_force_disconnect(xprt);
1795        case -ECONNREFUSED:
1796        case -ECONNRESET:
1797        case -ENETUNREACH:
1798                /* retry with existing socket, after a delay */
1799        case 0:
1800        case -EINPROGRESS:
1801        case -EALREADY:
1802                xprt_clear_connecting(xprt);
1803                return;
1804        }
1805out_eagain:
1806        status = -EAGAIN;
1807out:
1808        xprt_clear_connecting(xprt);
1809        xprt_wake_pending_tasks(xprt, status);
1810}
1811
1812static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
1813                struct sock_xprt *transport)
1814{
1815        struct socket *sock;
1816        int err;
1817
1818        /* start from scratch */
1819        err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1820        if (err < 0) {
1821                dprintk("RPC:       can't create TCP transport socket (%d).\n",
1822                                -err);
1823                goto out_err;
1824        }
1825        xs_reclassify_socket4(sock);
1826
1827        if (xs_bind4(transport, sock) < 0) {
1828                sock_release(sock);
1829                goto out_err;
1830        }
1831        return sock;
1832out_err:
1833        return ERR_PTR(-EIO);
1834}
1835
1836/**
1837 * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
1838 * @work: RPC transport to connect
1839 *
1840 * Invoked by a work queue tasklet.
1841 */
1842static void xs_tcp_connect_worker4(struct work_struct *work)
1843{
1844        struct sock_xprt *transport =
1845                container_of(work, struct sock_xprt, connect_worker.work);
1846        struct rpc_xprt *xprt = &transport->xprt;
1847
1848        xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
1849}
1850
1851static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
1852                struct sock_xprt *transport)
1853{
1854        struct socket *sock;
1855        int err;
1856
1857        /* start from scratch */
1858        err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
1859        if (err < 0) {
1860                dprintk("RPC:       can't create TCP transport socket (%d).\n",
1861                                -err);
1862                goto out_err;
1863        }
1864        xs_reclassify_socket6(sock);
1865
1866        if (xs_bind6(transport, sock) < 0) {
1867                sock_release(sock);
1868                goto out_err;
1869        }
1870        return sock;
1871out_err:
1872        return ERR_PTR(-EIO);
1873}
1874
1875/**
1876 * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
1877 * @work: RPC transport to connect
1878 *
1879 * Invoked by a work queue tasklet.
1880 */
1881static void xs_tcp_connect_worker6(struct work_struct *work)
1882{
1883        struct sock_xprt *transport =
1884                container_of(work, struct sock_xprt, connect_worker.work);
1885        struct rpc_xprt *xprt = &transport->xprt;
1886
1887        xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
1888}
1889
1890/**
1891 * xs_connect - connect a socket to a remote endpoint
1892 * @task: address of RPC task that manages state of connect request
1893 *
1894 * TCP: If the remote end dropped the connection, delay reconnecting.
1895 *
1896 * UDP socket connects are synchronous, but we use a work queue anyway
1897 * to guarantee that even unprivileged user processes can set up a
1898 * socket on a privileged port.
1899 *
1900 * If a UDP socket connect fails, the delay behavior here prevents
1901 * retry floods (hard mounts).
1902 */
1903static void xs_connect(struct rpc_task *task)
1904{
1905        struct rpc_xprt *xprt = task->tk_xprt;
1906        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1907
1908        if (xprt_test_and_set_connecting(xprt))
1909                return;
1910
1911        if (transport->sock != NULL) {
1912                dprintk("RPC:       xs_connect delayed xprt %p for %lu "
1913                                "seconds\n",
1914                                xprt, xprt->reestablish_timeout / HZ);
1915                queue_delayed_work(rpciod_workqueue,
1916                                   &transport->connect_worker,
1917                                   xprt->reestablish_timeout);
1918                xprt->reestablish_timeout <<= 1;
1919                if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
1920                        xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1921        } else {
1922                dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
1923                queue_delayed_work(rpciod_workqueue,
1924                                   &transport->connect_worker, 0);
1925        }
1926}
1927
1928static void xs_tcp_connect(struct rpc_task *task)
1929{
1930        struct rpc_xprt *xprt = task->tk_xprt;
1931
1932        /* Exit if we need to wait for socket shutdown to complete */
1933        if (test_bit(XPRT_CLOSING, &xprt->state))
1934                return;
1935        xs_connect(task);
1936}
1937
1938/**
1939 * xs_udp_print_stats - display UDP socket-specifc stats
1940 * @xprt: rpc_xprt struct containing statistics
1941 * @seq: output file
1942 *
1943 */
1944static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1945{
1946        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1947
1948        seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1949                        transport->port,
1950                        xprt->stat.bind_count,
1951                        xprt->stat.sends,
1952                        xprt->stat.recvs,
1953                        xprt->stat.bad_xids,
1954                        xprt->stat.req_u,
1955                        xprt->stat.bklog_u);
1956}
1957
1958/**
1959 * xs_tcp_print_stats - display TCP socket-specifc stats
1960 * @xprt: rpc_xprt struct containing statistics
1961 * @seq: output file
1962 *
1963 */
1964static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1965{
1966        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1967        long idle_time = 0;
1968
1969        if (xprt_connected(xprt))
1970                idle_time = (long)(jiffies - xprt->last_used) / HZ;
1971
1972        seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1973                        transport->port,
1974                        xprt->stat.bind_count,
1975                        xprt->stat.connect_count,
1976                        xprt->stat.connect_time,
1977                        idle_time,
1978                        xprt->stat.sends,
1979                        xprt->stat.recvs,
1980                        xprt->stat.bad_xids,
1981                        xprt->stat.req_u,
1982                        xprt->stat.bklog_u);
1983}
1984
1985static struct rpc_xprt_ops xs_udp_ops = {
1986        .set_buffer_size        = xs_udp_set_buffer_size,
1987        .reserve_xprt           = xprt_reserve_xprt_cong,
1988        .release_xprt           = xprt_release_xprt_cong,
1989        .rpcbind                = rpcb_getport_async,
1990        .set_port               = xs_set_port,
1991        .connect                = xs_connect,
1992        .buf_alloc              = rpc_malloc,
1993        .buf_free               = rpc_free,
1994        .send_request           = xs_udp_send_request,
1995        .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
1996        .timer                  = xs_udp_timer,
1997        .release_request        = xprt_release_rqst_cong,
1998        .close                  = xs_close,
1999        .destroy                = xs_destroy,
2000        .print_stats            = xs_udp_print_stats,
2001};
2002
2003static struct rpc_xprt_ops xs_tcp_ops = {
2004        .reserve_xprt           = xprt_reserve_xprt,
2005        .release_xprt           = xs_tcp_release_xprt,
2006        .rpcbind                = rpcb_getport_async,
2007        .set_port               = xs_set_port,
2008        .connect                = xs_tcp_connect,
2009        .buf_alloc              = rpc_malloc,
2010        .buf_free               = rpc_free,
2011        .send_request           = xs_tcp_send_request,
2012        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2013        .close                  = xs_tcp_close,
2014        .destroy                = xs_destroy,
2015        .print_stats            = xs_tcp_print_stats,
2016};
2017
2018static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2019                                      unsigned int slot_table_size)
2020{
2021        struct rpc_xprt *xprt;
2022        struct sock_xprt *new;
2023
2024        if (args->addrlen > sizeof(xprt->addr)) {
2025                dprintk("RPC:       xs_setup_xprt: address too large\n");
2026                return ERR_PTR(-EBADF);
2027        }
2028
2029        new = kzalloc(sizeof(*new), GFP_KERNEL);
2030        if (new == NULL) {
2031                dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2032                                "rpc_xprt\n");
2033                return ERR_PTR(-ENOMEM);
2034        }
2035        xprt = &new->xprt;
2036
2037        xprt->max_reqs = slot_table_size;
2038        xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
2039        if (xprt->slot == NULL) {
2040                kfree(xprt);
2041                dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
2042                                "table\n");
2043                return ERR_PTR(-ENOMEM);
2044        }
2045
2046        memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2047        xprt->addrlen = args->addrlen;
2048        if (args->srcaddr)
2049                memcpy(&new->addr, args->srcaddr, args->addrlen);
2050
2051        return xprt;
2052}
2053
2054static const struct rpc_timeout xs_udp_default_timeout = {
2055        .to_initval = 5 * HZ,
2056        .to_maxval = 30 * HZ,
2057        .to_increment = 5 * HZ,
2058        .to_retries = 5,
2059};
2060
2061/**
2062 * xs_setup_udp - Set up transport to use a UDP socket
2063 * @args: rpc transport creation arguments
2064 *
2065 */
2066static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2067{
2068        struct sockaddr *addr = args->dstaddr;
2069        struct rpc_xprt *xprt;
2070        struct sock_xprt *transport;
2071
2072        xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
2073        if (IS_ERR(xprt))
2074                return xprt;
2075        transport = container_of(xprt, struct sock_xprt, xprt);
2076
2077        xprt->prot = IPPROTO_UDP;
2078        xprt->tsh_size = 0;
2079        /* XXX: header size can vary due to auth type, IPv6, etc. */
2080        xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2081
2082        xprt->bind_timeout = XS_BIND_TO;
2083        xprt->connect_timeout = XS_UDP_CONN_TO;
2084        xprt->reestablish_timeout = XS_UDP_REEST_TO;
2085        xprt->idle_timeout = XS_IDLE_DISC_TO;
2086
2087        xprt->ops = &xs_udp_ops;
2088
2089        xprt->timeout = &xs_udp_default_timeout;
2090
2091        switch (addr->sa_family) {
2092        case AF_INET:
2093                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2094                        xprt_set_bound(xprt);
2095
2096                INIT_DELAYED_WORK(&transport->connect_worker,
2097                                        xs_udp_connect_worker4);
2098                xs_format_ipv4_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2099                break;
2100        case AF_INET6:
2101                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2102                        xprt_set_bound(xprt);
2103
2104                INIT_DELAYED_WORK(&transport->connect_worker,
2105                                        xs_udp_connect_worker6);
2106                xs_format_ipv6_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2107                break;
2108        default:
2109                kfree(xprt);
2110                return ERR_PTR(-EAFNOSUPPORT);
2111        }
2112
2113        dprintk("RPC:       set up transport to address %s\n",
2114                        xprt->address_strings[RPC_DISPLAY_ALL]);
2115
2116        if (try_module_get(THIS_MODULE))
2117                return xprt;
2118
2119        kfree(xprt->slot);
2120        kfree(xprt);
2121        return ERR_PTR(-EINVAL);
2122}
2123
2124static const struct rpc_timeout xs_tcp_default_timeout = {
2125        .to_initval = 60 * HZ,
2126        .to_maxval = 60 * HZ,
2127        .to_retries = 2,
2128};
2129
2130/**
2131 * xs_setup_tcp - Set up transport to use a TCP socket
2132 * @args: rpc transport creation arguments
2133 *
2134 */
2135static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2136{
2137        struct sockaddr *addr = args->dstaddr;
2138        struct rpc_xprt *xprt;
2139        struct sock_xprt *transport;
2140
2141        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2142        if (IS_ERR(xprt))
2143                return xprt;
2144        transport = container_of(xprt, struct sock_xprt, xprt);
2145
2146        xprt->prot = IPPROTO_TCP;
2147        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2148        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2149
2150        xprt->bind_timeout = XS_BIND_TO;
2151        xprt->connect_timeout = XS_TCP_CONN_TO;
2152        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2153        xprt->idle_timeout = XS_IDLE_DISC_TO;
2154
2155        xprt->ops = &xs_tcp_ops;
2156        xprt->timeout = &xs_tcp_default_timeout;
2157
2158        switch (addr->sa_family) {
2159        case AF_INET:
2160                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2161                        xprt_set_bound(xprt);
2162
2163                INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
2164                xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2165                break;
2166        case AF_INET6:
2167                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2168                        xprt_set_bound(xprt);
2169
2170                INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
2171                xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2172                break;
2173        default:
2174                kfree(xprt);
2175                return ERR_PTR(-EAFNOSUPPORT);
2176        }
2177
2178        dprintk("RPC:       set up transport to address %s\n",
2179                        xprt->address_strings[RPC_DISPLAY_ALL]);
2180
2181        if (try_module_get(THIS_MODULE))
2182                return xprt;
2183
2184        kfree(xprt->slot);
2185        kfree(xprt);
2186        return ERR_PTR(-EINVAL);
2187}
2188
2189static struct xprt_class        xs_udp_transport = {
2190        .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2191        .name           = "udp",
2192        .owner          = THIS_MODULE,
2193        .ident          = IPPROTO_UDP,
2194        .setup          = xs_setup_udp,
2195};
2196
2197static struct xprt_class        xs_tcp_transport = {
2198        .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2199        .name           = "tcp",
2200        .owner          = THIS_MODULE,
2201        .ident          = IPPROTO_TCP,
2202        .setup          = xs_setup_tcp,
2203};
2204
2205/**
2206 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2207 *
2208 */
2209int init_socket_xprt(void)
2210{
2211#ifdef RPC_DEBUG
2212        if (!sunrpc_table_header)
2213                sunrpc_table_header = register_sysctl_table(sunrpc_table);
2214#endif
2215
2216        xprt_register_transport(&xs_udp_transport);
2217        xprt_register_transport(&xs_tcp_transport);
2218
2219        return 0;
2220}
2221
2222/**
2223 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2224 *
2225 */
2226void cleanup_socket_xprt(void)
2227{
2228#ifdef RPC_DEBUG
2229        if (sunrpc_table_header) {
2230                unregister_sysctl_table(sunrpc_table_header);
2231                sunrpc_table_header = NULL;
2232        }
2233#endif
2234
2235        xprt_unregister_transport(&xs_udp_transport);
2236        xprt_unregister_transport(&xs_tcp_transport);
2237}
2238