linux/net/sunrpc/xprtsock.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/xprtsock.c
   3 *
   4 * Client-side transport implementation for sockets.
   5 *
   6 * TCP callback races fixes (C) 1998 Red Hat
   7 * TCP send fixes (C) 1998 Red Hat
   8 * TCP NFS related read + write fixes
   9 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10 *
  11 * Rewrite of larges part of the code in order to stabilize TCP stuff.
  12 * Fix behaviour when socket buffer is full.
  13 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
  14 *
  15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
  16 *
  17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
  18 *   <gilles.quillard@bull.net>
  19 */
  20
  21#include <linux/types.h>
  22#include <linux/string.h>
  23#include <linux/slab.h>
  24#include <linux/module.h>
  25#include <linux/capability.h>
  26#include <linux/pagemap.h>
  27#include <linux/errno.h>
  28#include <linux/socket.h>
  29#include <linux/in.h>
  30#include <linux/net.h>
  31#include <linux/mm.h>
  32#include <linux/un.h>
  33#include <linux/udp.h>
  34#include <linux/tcp.h>
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/sched.h>
  37#include <linux/sunrpc/svcsock.h>
  38#include <linux/sunrpc/xprtsock.h>
  39#include <linux/file.h>
  40#ifdef CONFIG_SUNRPC_BACKCHANNEL
  41#include <linux/sunrpc/bc_xprt.h>
  42#endif
  43
  44#include <net/sock.h>
  45#include <net/checksum.h>
  46#include <net/udp.h>
  47#include <net/tcp.h>
  48
  49#include "sunrpc.h"
  50
  51static void xs_close(struct rpc_xprt *xprt);
  52
  53/*
  54 * xprtsock tunables
  55 */
  56static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
  57static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
  58static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
  59
  60static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
  61static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
  62
  63#define XS_TCP_LINGER_TO        (15U * HZ)
  64static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
  65
  66/*
  67 * We can register our own files under /proc/sys/sunrpc by
  68 * calling register_sysctl_table() again.  The files in that
  69 * directory become the union of all files registered there.
  70 *
  71 * We simply need to make sure that we don't collide with
  72 * someone else's file names!
  73 */
  74
  75#ifdef RPC_DEBUG
  76
  77static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
  78static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
  79static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
  80static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
  81static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
  82
  83static struct ctl_table_header *sunrpc_table_header;
  84
  85/*
  86 * FIXME: changing the UDP slot table size should also resize the UDP
  87 *        socket buffers for existing UDP transports
  88 */
  89static ctl_table xs_tunables_table[] = {
  90        {
  91                .procname       = "udp_slot_table_entries",
  92                .data           = &xprt_udp_slot_table_entries,
  93                .maxlen         = sizeof(unsigned int),
  94                .mode           = 0644,
  95                .proc_handler   = proc_dointvec_minmax,
  96                .extra1         = &min_slot_table_size,
  97                .extra2         = &max_slot_table_size
  98        },
  99        {
 100                .procname       = "tcp_slot_table_entries",
 101                .data           = &xprt_tcp_slot_table_entries,
 102                .maxlen         = sizeof(unsigned int),
 103                .mode           = 0644,
 104                .proc_handler   = proc_dointvec_minmax,
 105                .extra1         = &min_slot_table_size,
 106                .extra2         = &max_slot_table_size
 107        },
 108        {
 109                .procname       = "tcp_max_slot_table_entries",
 110                .data           = &xprt_max_tcp_slot_table_entries,
 111                .maxlen         = sizeof(unsigned int),
 112                .mode           = 0644,
 113                .proc_handler   = proc_dointvec_minmax,
 114                .extra1         = &min_slot_table_size,
 115                .extra2         = &max_tcp_slot_table_limit
 116        },
 117        {
 118                .procname       = "min_resvport",
 119                .data           = &xprt_min_resvport,
 120                .maxlen         = sizeof(unsigned int),
 121                .mode           = 0644,
 122                .proc_handler   = proc_dointvec_minmax,
 123                .extra1         = &xprt_min_resvport_limit,
 124                .extra2         = &xprt_max_resvport_limit
 125        },
 126        {
 127                .procname       = "max_resvport",
 128                .data           = &xprt_max_resvport,
 129                .maxlen         = sizeof(unsigned int),
 130                .mode           = 0644,
 131                .proc_handler   = proc_dointvec_minmax,
 132                .extra1         = &xprt_min_resvport_limit,
 133                .extra2         = &xprt_max_resvport_limit
 134        },
 135        {
 136                .procname       = "tcp_fin_timeout",
 137                .data           = &xs_tcp_fin_timeout,
 138                .maxlen         = sizeof(xs_tcp_fin_timeout),
 139                .mode           = 0644,
 140                .proc_handler   = proc_dointvec_jiffies,
 141        },
 142        { },
 143};
 144
 145static ctl_table sunrpc_table[] = {
 146        {
 147                .procname       = "sunrpc",
 148                .mode           = 0555,
 149                .child          = xs_tunables_table
 150        },
 151        { },
 152};
 153
 154#endif
 155
 156/*
 157 * Wait duration for a reply from the RPC portmapper.
 158 */
 159#define XS_BIND_TO              (60U * HZ)
 160
 161/*
 162 * Delay if a UDP socket connect error occurs.  This is most likely some
 163 * kind of resource problem on the local host.
 164 */
 165#define XS_UDP_REEST_TO         (2U * HZ)
 166
 167/*
 168 * The reestablish timeout allows clients to delay for a bit before attempting
 169 * to reconnect to a server that just dropped our connection.
 170 *
 171 * We implement an exponential backoff when trying to reestablish a TCP
 172 * transport connection with the server.  Some servers like to drop a TCP
 173 * connection when they are overworked, so we start with a short timeout and
 174 * increase over time if the server is down or not responding.
 175 */
 176#define XS_TCP_INIT_REEST_TO    (3U * HZ)
 177#define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
 178
 179/*
 180 * TCP idle timeout; client drops the transport socket if it is idle
 181 * for this long.  Note that we also timeout UDP sockets to prevent
 182 * holding port numbers when there is no RPC traffic.
 183 */
 184#define XS_IDLE_DISC_TO         (5U * 60 * HZ)
 185
 186#ifdef RPC_DEBUG
 187# undef  RPC_DEBUG_DATA
 188# define RPCDBG_FACILITY        RPCDBG_TRANS
 189#endif
 190
 191#ifdef RPC_DEBUG_DATA
 192static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 193{
 194        u8 *buf = (u8 *) packet;
 195        int j;
 196
 197        dprintk("RPC:       %s\n", msg);
 198        for (j = 0; j < count && j < 128; j += 4) {
 199                if (!(j & 31)) {
 200                        if (j)
 201                                dprintk("\n");
 202                        dprintk("0x%04x ", j);
 203                }
 204                dprintk("%02x%02x%02x%02x ",
 205                        buf[j], buf[j+1], buf[j+2], buf[j+3]);
 206        }
 207        dprintk("\n");
 208}
 209#else
 210static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 211{
 212        /* NOP */
 213}
 214#endif
 215
 216struct sock_xprt {
 217        struct rpc_xprt         xprt;
 218
 219        /*
 220         * Network layer
 221         */
 222        struct socket *         sock;
 223        struct sock *           inet;
 224
 225        /*
 226         * State of TCP reply receive
 227         */
 228        __be32                  tcp_fraghdr,
 229                                tcp_xid,
 230                                tcp_calldir;
 231
 232        u32                     tcp_offset,
 233                                tcp_reclen;
 234
 235        unsigned long           tcp_copied,
 236                                tcp_flags;
 237
 238        /*
 239         * Connection of transports
 240         */
 241        struct delayed_work     connect_worker;
 242        struct sockaddr_storage srcaddr;
 243        unsigned short          srcport;
 244
 245        /*
 246         * UDP socket buffer size parameters
 247         */
 248        size_t                  rcvsize,
 249                                sndsize;
 250
 251        /*
 252         * Saved socket callback addresses
 253         */
 254        void                    (*old_data_ready)(struct sock *, int);
 255        void                    (*old_state_change)(struct sock *);
 256        void                    (*old_write_space)(struct sock *);
 257};
 258
 259/*
 260 * TCP receive state flags
 261 */
 262#define TCP_RCV_LAST_FRAG       (1UL << 0)
 263#define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
 264#define TCP_RCV_COPY_XID        (1UL << 2)
 265#define TCP_RCV_COPY_DATA       (1UL << 3)
 266#define TCP_RCV_READ_CALLDIR    (1UL << 4)
 267#define TCP_RCV_COPY_CALLDIR    (1UL << 5)
 268
 269/*
 270 * TCP RPC flags
 271 */
 272#define TCP_RPC_REPLY           (1UL << 6)
 273
 274static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
 275{
 276        return (struct sockaddr *) &xprt->addr;
 277}
 278
 279static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
 280{
 281        return (struct sockaddr_un *) &xprt->addr;
 282}
 283
 284static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
 285{
 286        return (struct sockaddr_in *) &xprt->addr;
 287}
 288
 289static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
 290{
 291        return (struct sockaddr_in6 *) &xprt->addr;
 292}
 293
 294static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
 295{
 296        struct sockaddr *sap = xs_addr(xprt);
 297        struct sockaddr_in6 *sin6;
 298        struct sockaddr_in *sin;
 299        struct sockaddr_un *sun;
 300        char buf[128];
 301
 302        switch (sap->sa_family) {
 303        case AF_LOCAL:
 304                sun = xs_addr_un(xprt);
 305                strlcpy(buf, sun->sun_path, sizeof(buf));
 306                xprt->address_strings[RPC_DISPLAY_ADDR] =
 307                                                kstrdup(buf, GFP_KERNEL);
 308                break;
 309        case AF_INET:
 310                (void)rpc_ntop(sap, buf, sizeof(buf));
 311                xprt->address_strings[RPC_DISPLAY_ADDR] =
 312                                                kstrdup(buf, GFP_KERNEL);
 313                sin = xs_addr_in(xprt);
 314                snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
 315                break;
 316        case AF_INET6:
 317                (void)rpc_ntop(sap, buf, sizeof(buf));
 318                xprt->address_strings[RPC_DISPLAY_ADDR] =
 319                                                kstrdup(buf, GFP_KERNEL);
 320                sin6 = xs_addr_in6(xprt);
 321                snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 322                break;
 323        default:
 324                BUG();
 325        }
 326
 327        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 328}
 329
 330static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
 331{
 332        struct sockaddr *sap = xs_addr(xprt);
 333        char buf[128];
 334
 335        snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
 336        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 337
 338        snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
 339        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 340}
 341
 342static void xs_format_peer_addresses(struct rpc_xprt *xprt,
 343                                     const char *protocol,
 344                                     const char *netid)
 345{
 346        xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
 347        xprt->address_strings[RPC_DISPLAY_NETID] = netid;
 348        xs_format_common_peer_addresses(xprt);
 349        xs_format_common_peer_ports(xprt);
 350}
 351
 352static void xs_update_peer_port(struct rpc_xprt *xprt)
 353{
 354        kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
 355        kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
 356
 357        xs_format_common_peer_ports(xprt);
 358}
 359
 360static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 361{
 362        unsigned int i;
 363
 364        for (i = 0; i < RPC_DISPLAY_MAX; i++)
 365                switch (i) {
 366                case RPC_DISPLAY_PROTO:
 367                case RPC_DISPLAY_NETID:
 368                        continue;
 369                default:
 370                        kfree(xprt->address_strings[i]);
 371                }
 372}
 373
 374#define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
 375
 376static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
 377{
 378        struct msghdr msg = {
 379                .msg_name       = addr,
 380                .msg_namelen    = addrlen,
 381                .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
 382        };
 383        struct kvec iov = {
 384                .iov_base       = vec->iov_base + base,
 385                .iov_len        = vec->iov_len - base,
 386        };
 387
 388        if (iov.iov_len != 0)
 389                return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
 390        return kernel_sendmsg(sock, &msg, NULL, 0, 0);
 391}
 392
 393static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
 394{
 395        struct page **ppage;
 396        unsigned int remainder;
 397        int err, sent = 0;
 398
 399        remainder = xdr->page_len - base;
 400        base += xdr->page_base;
 401        ppage = xdr->pages + (base >> PAGE_SHIFT);
 402        base &= ~PAGE_MASK;
 403        for(;;) {
 404                unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
 405                int flags = XS_SENDMSG_FLAGS;
 406
 407                remainder -= len;
 408                if (remainder != 0 || more)
 409                        flags |= MSG_MORE;
 410                err = sock->ops->sendpage(sock, *ppage, base, len, flags);
 411                if (remainder == 0 || err != len)
 412                        break;
 413                sent += err;
 414                ppage++;
 415                base = 0;
 416        }
 417        if (sent == 0)
 418                return err;
 419        if (err > 0)
 420                sent += err;
 421        return sent;
 422}
 423
 424/**
 425 * xs_sendpages - write pages directly to a socket
 426 * @sock: socket to send on
 427 * @addr: UDP only -- address of destination
 428 * @addrlen: UDP only -- length of destination address
 429 * @xdr: buffer containing this request
 430 * @base: starting position in the buffer
 431 *
 432 */
 433static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
 434{
 435        unsigned int remainder = xdr->len - base;
 436        int err, sent = 0;
 437
 438        if (unlikely(!sock))
 439                return -ENOTSOCK;
 440
 441        clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
 442        if (base != 0) {
 443                addr = NULL;
 444                addrlen = 0;
 445        }
 446
 447        if (base < xdr->head[0].iov_len || addr != NULL) {
 448                unsigned int len = xdr->head[0].iov_len - base;
 449                remainder -= len;
 450                err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
 451                if (remainder == 0 || err != len)
 452                        goto out;
 453                sent += err;
 454                base = 0;
 455        } else
 456                base -= xdr->head[0].iov_len;
 457
 458        if (base < xdr->page_len) {
 459                unsigned int len = xdr->page_len - base;
 460                remainder -= len;
 461                err = xs_send_pagedata(sock, xdr, base, remainder != 0);
 462                if (remainder == 0 || err != len)
 463                        goto out;
 464                sent += err;
 465                base = 0;
 466        } else
 467                base -= xdr->page_len;
 468
 469        if (base >= xdr->tail[0].iov_len)
 470                return sent;
 471        err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
 472out:
 473        if (sent == 0)
 474                return err;
 475        if (err > 0)
 476                sent += err;
 477        return sent;
 478}
 479
 480static void xs_nospace_callback(struct rpc_task *task)
 481{
 482        struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
 483
 484        transport->inet->sk_write_pending--;
 485        clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 486}
 487
 488/**
 489 * xs_nospace - place task on wait queue if transmit was incomplete
 490 * @task: task to put to sleep
 491 *
 492 */
 493static int xs_nospace(struct rpc_task *task)
 494{
 495        struct rpc_rqst *req = task->tk_rqstp;
 496        struct rpc_xprt *xprt = req->rq_xprt;
 497        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 498        int ret = -EAGAIN;
 499
 500        dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
 501                        task->tk_pid, req->rq_slen - req->rq_bytes_sent,
 502                        req->rq_slen);
 503
 504        /* Protect against races with write_space */
 505        spin_lock_bh(&xprt->transport_lock);
 506
 507        /* Don't race with disconnect */
 508        if (xprt_connected(xprt)) {
 509                if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
 510                        /*
 511                         * Notify TCP that we're limited by the application
 512                         * window size
 513                         */
 514                        set_bit(SOCK_NOSPACE, &transport->sock->flags);
 515                        transport->inet->sk_write_pending++;
 516                        /* ...and wait for more buffer space */
 517                        xprt_wait_for_buffer_space(task, xs_nospace_callback);
 518                }
 519        } else {
 520                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 521                ret = -ENOTCONN;
 522        }
 523
 524        spin_unlock_bh(&xprt->transport_lock);
 525        return ret;
 526}
 527
 528/*
 529 * Construct a stream transport record marker in @buf.
 530 */
 531static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
 532{
 533        u32 reclen = buf->len - sizeof(rpc_fraghdr);
 534        rpc_fraghdr *base = buf->head[0].iov_base;
 535        *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
 536}
 537
 538/**
 539 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
 540 * @task: RPC task that manages the state of an RPC request
 541 *
 542 * Return values:
 543 *        0:    The request has been sent
 544 *   EAGAIN:    The socket was blocked, please call again later to
 545 *              complete the request
 546 * ENOTCONN:    Caller needs to invoke connect logic then call again
 547 *    other:    Some other error occured, the request was not sent
 548 */
 549static int xs_local_send_request(struct rpc_task *task)
 550{
 551        struct rpc_rqst *req = task->tk_rqstp;
 552        struct rpc_xprt *xprt = req->rq_xprt;
 553        struct sock_xprt *transport =
 554                                container_of(xprt, struct sock_xprt, xprt);
 555        struct xdr_buf *xdr = &req->rq_snd_buf;
 556        int status;
 557
 558        xs_encode_stream_record_marker(&req->rq_snd_buf);
 559
 560        xs_pktdump("packet data:",
 561                        req->rq_svec->iov_base, req->rq_svec->iov_len);
 562
 563        status = xs_sendpages(transport->sock, NULL, 0,
 564                                                xdr, req->rq_bytes_sent);
 565        dprintk("RPC:       %s(%u) = %d\n",
 566                        __func__, xdr->len - req->rq_bytes_sent, status);
 567        if (likely(status >= 0)) {
 568                req->rq_bytes_sent += status;
 569                req->rq_xmit_bytes_sent += status;
 570                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 571                        req->rq_bytes_sent = 0;
 572                        return 0;
 573                }
 574                status = -EAGAIN;
 575        }
 576
 577        switch (status) {
 578        case -EAGAIN:
 579                status = xs_nospace(task);
 580                break;
 581        default:
 582                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 583                        -status);
 584        case -EPIPE:
 585                xs_close(xprt);
 586                status = -ENOTCONN;
 587        }
 588
 589        return status;
 590}
 591
 592/**
 593 * xs_udp_send_request - write an RPC request to a UDP socket
 594 * @task: address of RPC task that manages the state of an RPC request
 595 *
 596 * Return values:
 597 *        0:    The request has been sent
 598 *   EAGAIN:    The socket was blocked, please call again later to
 599 *              complete the request
 600 * ENOTCONN:    Caller needs to invoke connect logic then call again
 601 *    other:    Some other error occurred, the request was not sent
 602 */
 603static int xs_udp_send_request(struct rpc_task *task)
 604{
 605        struct rpc_rqst *req = task->tk_rqstp;
 606        struct rpc_xprt *xprt = req->rq_xprt;
 607        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 608        struct xdr_buf *xdr = &req->rq_snd_buf;
 609        int status;
 610
 611        xs_pktdump("packet data:",
 612                                req->rq_svec->iov_base,
 613                                req->rq_svec->iov_len);
 614
 615        if (!xprt_bound(xprt))
 616                return -ENOTCONN;
 617        status = xs_sendpages(transport->sock,
 618                              xs_addr(xprt),
 619                              xprt->addrlen, xdr,
 620                              req->rq_bytes_sent);
 621
 622        dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
 623                        xdr->len - req->rq_bytes_sent, status);
 624
 625        if (status >= 0) {
 626                req->rq_xmit_bytes_sent += status;
 627                if (status >= req->rq_slen)
 628                        return 0;
 629                /* Still some bytes left; set up for a retry later. */
 630                status = -EAGAIN;
 631        }
 632
 633        switch (status) {
 634        case -ENOTSOCK:
 635                status = -ENOTCONN;
 636                /* Should we call xs_close() here? */
 637                break;
 638        case -EAGAIN:
 639                status = xs_nospace(task);
 640                break;
 641        default:
 642                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 643                        -status);
 644        case -ENETUNREACH:
 645        case -EPIPE:
 646        case -ECONNREFUSED:
 647                /* When the server has died, an ICMP port unreachable message
 648                 * prompts ECONNREFUSED. */
 649                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 650        }
 651
 652        return status;
 653}
 654
 655/**
 656 * xs_tcp_shutdown - gracefully shut down a TCP socket
 657 * @xprt: transport
 658 *
 659 * Initiates a graceful shutdown of the TCP socket by calling the
 660 * equivalent of shutdown(SHUT_WR);
 661 */
 662static void xs_tcp_shutdown(struct rpc_xprt *xprt)
 663{
 664        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 665        struct socket *sock = transport->sock;
 666
 667        if (sock != NULL)
 668                kernel_sock_shutdown(sock, SHUT_WR);
 669}
 670
 671/**
 672 * xs_tcp_send_request - write an RPC request to a TCP socket
 673 * @task: address of RPC task that manages the state of an RPC request
 674 *
 675 * Return values:
 676 *        0:    The request has been sent
 677 *   EAGAIN:    The socket was blocked, please call again later to
 678 *              complete the request
 679 * ENOTCONN:    Caller needs to invoke connect logic then call again
 680 *    other:    Some other error occurred, the request was not sent
 681 *
 682 * XXX: In the case of soft timeouts, should we eventually give up
 683 *      if sendmsg is not able to make progress?
 684 */
 685static int xs_tcp_send_request(struct rpc_task *task)
 686{
 687        struct rpc_rqst *req = task->tk_rqstp;
 688        struct rpc_xprt *xprt = req->rq_xprt;
 689        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 690        struct xdr_buf *xdr = &req->rq_snd_buf;
 691        int status;
 692
 693        xs_encode_stream_record_marker(&req->rq_snd_buf);
 694
 695        xs_pktdump("packet data:",
 696                                req->rq_svec->iov_base,
 697                                req->rq_svec->iov_len);
 698
 699        /* Continue transmitting the packet/record. We must be careful
 700         * to cope with writespace callbacks arriving _after_ we have
 701         * called sendmsg(). */
 702        while (1) {
 703                status = xs_sendpages(transport->sock,
 704                                        NULL, 0, xdr, req->rq_bytes_sent);
 705
 706                dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
 707                                xdr->len - req->rq_bytes_sent, status);
 708
 709                if (unlikely(status < 0))
 710                        break;
 711
 712                /* If we've sent the entire packet, immediately
 713                 * reset the count of bytes sent. */
 714                req->rq_bytes_sent += status;
 715                req->rq_xmit_bytes_sent += status;
 716                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 717                        req->rq_bytes_sent = 0;
 718                        return 0;
 719                }
 720
 721                if (status != 0)
 722                        continue;
 723                status = -EAGAIN;
 724                break;
 725        }
 726
 727        switch (status) {
 728        case -ENOTSOCK:
 729                status = -ENOTCONN;
 730                /* Should we call xs_close() here? */
 731                break;
 732        case -EAGAIN:
 733                status = xs_nospace(task);
 734                break;
 735        default:
 736                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 737                        -status);
 738        case -ECONNRESET:
 739                xs_tcp_shutdown(xprt);
 740        case -ECONNREFUSED:
 741        case -ENOTCONN:
 742        case -EPIPE:
 743                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 744        }
 745
 746        return status;
 747}
 748
 749/**
 750 * xs_tcp_release_xprt - clean up after a tcp transmission
 751 * @xprt: transport
 752 * @task: rpc task
 753 *
 754 * This cleans up if an error causes us to abort the transmission of a request.
 755 * In this case, the socket may need to be reset in order to avoid confusing
 756 * the server.
 757 */
 758static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 759{
 760        struct rpc_rqst *req;
 761
 762        if (task != xprt->snd_task)
 763                return;
 764        if (task == NULL)
 765                goto out_release;
 766        req = task->tk_rqstp;
 767        if (req == NULL)
 768                goto out_release;
 769        if (req->rq_bytes_sent == 0)
 770                goto out_release;
 771        if (req->rq_bytes_sent == req->rq_snd_buf.len)
 772                goto out_release;
 773        set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
 774out_release:
 775        xprt_release_xprt(xprt, task);
 776}
 777
 778static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 779{
 780        transport->old_data_ready = sk->sk_data_ready;
 781        transport->old_state_change = sk->sk_state_change;
 782        transport->old_write_space = sk->sk_write_space;
 783}
 784
 785static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 786{
 787        sk->sk_data_ready = transport->old_data_ready;
 788        sk->sk_state_change = transport->old_state_change;
 789        sk->sk_write_space = transport->old_write_space;
 790}
 791
 792static void xs_reset_transport(struct sock_xprt *transport)
 793{
 794        struct socket *sock = transport->sock;
 795        struct sock *sk = transport->inet;
 796
 797        if (sk == NULL)
 798                return;
 799
 800        transport->srcport = 0;
 801
 802        write_lock_bh(&sk->sk_callback_lock);
 803        transport->inet = NULL;
 804        transport->sock = NULL;
 805
 806        sk->sk_user_data = NULL;
 807
 808        xs_restore_old_callbacks(transport, sk);
 809        write_unlock_bh(&sk->sk_callback_lock);
 810
 811        sk->sk_no_check = 0;
 812
 813        sock_release(sock);
 814}
 815
 816/**
 817 * xs_close - close a socket
 818 * @xprt: transport
 819 *
 820 * This is used when all requests are complete; ie, no DRC state remains
 821 * on the server we want to save.
 822 *
 823 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
 824 * xs_reset_transport() zeroing the socket from underneath a writer.
 825 */
 826static void xs_close(struct rpc_xprt *xprt)
 827{
 828        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 829
 830        dprintk("RPC:       xs_close xprt %p\n", xprt);
 831
 832        xs_reset_transport(transport);
 833        xprt->reestablish_timeout = 0;
 834
 835        smp_mb__before_clear_bit();
 836        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
 837        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 838        clear_bit(XPRT_CLOSING, &xprt->state);
 839        smp_mb__after_clear_bit();
 840        xprt_disconnect_done(xprt);
 841}
 842
 843static void xs_tcp_close(struct rpc_xprt *xprt)
 844{
 845        if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
 846                xs_close(xprt);
 847        else
 848                xs_tcp_shutdown(xprt);
 849}
 850
 851/**
 852 * xs_destroy - prepare to shutdown a transport
 853 * @xprt: doomed transport
 854 *
 855 */
 856static void xs_destroy(struct rpc_xprt *xprt)
 857{
 858        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 859
 860        dprintk("RPC:       xs_destroy xprt %p\n", xprt);
 861
 862        cancel_delayed_work_sync(&transport->connect_worker);
 863
 864        xs_close(xprt);
 865        xs_free_peer_addresses(xprt);
 866        xprt_free(xprt);
 867        module_put(THIS_MODULE);
 868}
 869
 870static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
 871{
 872        return (struct rpc_xprt *) sk->sk_user_data;
 873}
 874
 875static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
 876{
 877        struct xdr_skb_reader desc = {
 878                .skb            = skb,
 879                .offset         = sizeof(rpc_fraghdr),
 880                .count          = skb->len - sizeof(rpc_fraghdr),
 881        };
 882
 883        if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
 884                return -1;
 885        if (desc.count)
 886                return -1;
 887        return 0;
 888}
 889
 890/**
 891 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
 892 * @sk: socket with data to read
 893 * @len: how much data to read
 894 *
 895 * Currently this assumes we can read the whole reply in a single gulp.
 896 */
 897static void xs_local_data_ready(struct sock *sk, int len)
 898{
 899        struct rpc_task *task;
 900        struct rpc_xprt *xprt;
 901        struct rpc_rqst *rovr;
 902        struct sk_buff *skb;
 903        int err, repsize, copied;
 904        u32 _xid;
 905        __be32 *xp;
 906
 907        read_lock_bh(&sk->sk_callback_lock);
 908        dprintk("RPC:       %s...\n", __func__);
 909        xprt = xprt_from_sock(sk);
 910        if (xprt == NULL)
 911                goto out;
 912
 913        skb = skb_recv_datagram(sk, 0, 1, &err);
 914        if (skb == NULL)
 915                goto out;
 916
 917        if (xprt->shutdown)
 918                goto dropit;
 919
 920        repsize = skb->len - sizeof(rpc_fraghdr);
 921        if (repsize < 4) {
 922                dprintk("RPC:       impossible RPC reply size %d\n", repsize);
 923                goto dropit;
 924        }
 925
 926        /* Copy the XID from the skb... */
 927        xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
 928        if (xp == NULL)
 929                goto dropit;
 930
 931        /* Look up and lock the request corresponding to the given XID */
 932        spin_lock(&xprt->transport_lock);
 933        rovr = xprt_lookup_rqst(xprt, *xp);
 934        if (!rovr)
 935                goto out_unlock;
 936        task = rovr->rq_task;
 937
 938        copied = rovr->rq_private_buf.buflen;
 939        if (copied > repsize)
 940                copied = repsize;
 941
 942        if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 943                dprintk("RPC:       sk_buff copy failed\n");
 944                goto out_unlock;
 945        }
 946
 947        xprt_complete_rqst(task, copied);
 948
 949 out_unlock:
 950        spin_unlock(&xprt->transport_lock);
 951 dropit:
 952        skb_free_datagram(sk, skb);
 953 out:
 954        read_unlock_bh(&sk->sk_callback_lock);
 955}
 956
 957/**
 958 * xs_udp_data_ready - "data ready" callback for UDP sockets
 959 * @sk: socket with data to read
 960 * @len: how much data to read
 961 *
 962 */
 963static void xs_udp_data_ready(struct sock *sk, int len)
 964{
 965        struct rpc_task *task;
 966        struct rpc_xprt *xprt;
 967        struct rpc_rqst *rovr;
 968        struct sk_buff *skb;
 969        int err, repsize, copied;
 970        u32 _xid;
 971        __be32 *xp;
 972
 973        read_lock_bh(&sk->sk_callback_lock);
 974        dprintk("RPC:       xs_udp_data_ready...\n");
 975        if (!(xprt = xprt_from_sock(sk)))
 976                goto out;
 977
 978        if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
 979                goto out;
 980
 981        if (xprt->shutdown)
 982                goto dropit;
 983
 984        repsize = skb->len - sizeof(struct udphdr);
 985        if (repsize < 4) {
 986                dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
 987                goto dropit;
 988        }
 989
 990        /* Copy the XID from the skb... */
 991        xp = skb_header_pointer(skb, sizeof(struct udphdr),
 992                                sizeof(_xid), &_xid);
 993        if (xp == NULL)
 994                goto dropit;
 995
 996        /* Look up and lock the request corresponding to the given XID */
 997        spin_lock(&xprt->transport_lock);
 998        rovr = xprt_lookup_rqst(xprt, *xp);
 999        if (!rovr)
1000                goto out_unlock;
1001        task = rovr->rq_task;
1002
1003        if ((copied = rovr->rq_private_buf.buflen) > repsize)
1004                copied = repsize;
1005
1006        /* Suck it into the iovec, verify checksum if not done by hw. */
1007        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1008                UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
1009                goto out_unlock;
1010        }
1011
1012        UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
1013
1014        xprt_adjust_cwnd(task, copied);
1015        xprt_complete_rqst(task, copied);
1016
1017 out_unlock:
1018        spin_unlock(&xprt->transport_lock);
1019 dropit:
1020        skb_free_datagram(sk, skb);
1021 out:
1022        read_unlock_bh(&sk->sk_callback_lock);
1023}
1024
1025/*
1026 * Helper function to force a TCP close if the server is sending
1027 * junk and/or it has put us in CLOSE_WAIT
1028 */
1029static void xs_tcp_force_close(struct rpc_xprt *xprt)
1030{
1031        set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1032        xprt_force_disconnect(xprt);
1033}
1034
1035static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1036{
1037        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1038        size_t len, used;
1039        char *p;
1040
1041        p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
1042        len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1043        used = xdr_skb_read_bits(desc, p, len);
1044        transport->tcp_offset += used;
1045        if (used != len)
1046                return;
1047
1048        transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
1049        if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1050                transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1051        else
1052                transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1053        transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1054
1055        transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1056        transport->tcp_offset = 0;
1057
1058        /* Sanity check of the record length */
1059        if (unlikely(transport->tcp_reclen < 8)) {
1060                dprintk("RPC:       invalid TCP record fragment length\n");
1061                xs_tcp_force_close(xprt);
1062                return;
1063        }
1064        dprintk("RPC:       reading TCP record fragment of length %d\n",
1065                        transport->tcp_reclen);
1066}
1067
1068static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1069{
1070        if (transport->tcp_offset == transport->tcp_reclen) {
1071                transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1072                transport->tcp_offset = 0;
1073                if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
1074                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1075                        transport->tcp_flags |= TCP_RCV_COPY_XID;
1076                        transport->tcp_copied = 0;
1077                }
1078        }
1079}
1080
1081static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1082{
1083        size_t len, used;
1084        char *p;
1085
1086        len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1087        dprintk("RPC:       reading XID (%Zu bytes)\n", len);
1088        p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1089        used = xdr_skb_read_bits(desc, p, len);
1090        transport->tcp_offset += used;
1091        if (used != len)
1092                return;
1093        transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1094        transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1095        transport->tcp_copied = 4;
1096        dprintk("RPC:       reading %s XID %08x\n",
1097                        (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1098                                                              : "request with",
1099                        ntohl(transport->tcp_xid));
1100        xs_tcp_check_fraghdr(transport);
1101}
1102
1103static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1104                                       struct xdr_skb_reader *desc)
1105{
1106        size_t len, used;
1107        u32 offset;
1108        char *p;
1109
1110        /*
1111         * We want transport->tcp_offset to be 8 at the end of this routine
1112         * (4 bytes for the xid and 4 bytes for the call/reply flag).
1113         * When this function is called for the first time,
1114         * transport->tcp_offset is 4 (after having already read the xid).
1115         */
1116        offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1117        len = sizeof(transport->tcp_calldir) - offset;
1118        dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1119        p = ((char *) &transport->tcp_calldir) + offset;
1120        used = xdr_skb_read_bits(desc, p, len);
1121        transport->tcp_offset += used;
1122        if (used != len)
1123                return;
1124        transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1125        /*
1126         * We don't yet have the XDR buffer, so we will write the calldir
1127         * out after we get the buffer from the 'struct rpc_rqst'
1128         */
1129        switch (ntohl(transport->tcp_calldir)) {
1130        case RPC_REPLY:
1131                transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1132                transport->tcp_flags |= TCP_RCV_COPY_DATA;
1133                transport->tcp_flags |= TCP_RPC_REPLY;
1134                break;
1135        case RPC_CALL:
1136                transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1137                transport->tcp_flags |= TCP_RCV_COPY_DATA;
1138                transport->tcp_flags &= ~TCP_RPC_REPLY;
1139                break;
1140        default:
1141                dprintk("RPC:       invalid request message type\n");
1142                xs_tcp_force_close(&transport->xprt);
1143        }
1144        xs_tcp_check_fraghdr(transport);
1145}
1146
1147static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1148                                     struct xdr_skb_reader *desc,
1149                                     struct rpc_rqst *req)
1150{
1151        struct sock_xprt *transport =
1152                                container_of(xprt, struct sock_xprt, xprt);
1153        struct xdr_buf *rcvbuf;
1154        size_t len;
1155        ssize_t r;
1156
1157        rcvbuf = &req->rq_private_buf;
1158
1159        if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1160                /*
1161                 * Save the RPC direction in the XDR buffer
1162                 */
1163                memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1164                        &transport->tcp_calldir,
1165                        sizeof(transport->tcp_calldir));
1166                transport->tcp_copied += sizeof(transport->tcp_calldir);
1167                transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1168        }
1169
1170        len = desc->count;
1171        if (len > transport->tcp_reclen - transport->tcp_offset) {
1172                struct xdr_skb_reader my_desc;
1173
1174                len = transport->tcp_reclen - transport->tcp_offset;
1175                memcpy(&my_desc, desc, sizeof(my_desc));
1176                my_desc.count = len;
1177                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1178                                          &my_desc, xdr_skb_read_bits);
1179                desc->count -= r;
1180                desc->offset += r;
1181        } else
1182                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1183                                          desc, xdr_skb_read_bits);
1184
1185        if (r > 0) {
1186                transport->tcp_copied += r;
1187                transport->tcp_offset += r;
1188        }
1189        if (r != len) {
1190                /* Error when copying to the receive buffer,
1191                 * usually because we weren't able to allocate
1192                 * additional buffer pages. All we can do now
1193                 * is turn off TCP_RCV_COPY_DATA, so the request
1194                 * will not receive any additional updates,
1195                 * and time out.
1196                 * Any remaining data from this record will
1197                 * be discarded.
1198                 */
1199                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1200                dprintk("RPC:       XID %08x truncated request\n",
1201                                ntohl(transport->tcp_xid));
1202                dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1203                                "tcp_offset = %u, tcp_reclen = %u\n",
1204                                xprt, transport->tcp_copied,
1205                                transport->tcp_offset, transport->tcp_reclen);
1206                return;
1207        }
1208
1209        dprintk("RPC:       XID %08x read %Zd bytes\n",
1210                        ntohl(transport->tcp_xid), r);
1211        dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1212                        "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1213                        transport->tcp_offset, transport->tcp_reclen);
1214
1215        if (transport->tcp_copied == req->rq_private_buf.buflen)
1216                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1217        else if (transport->tcp_offset == transport->tcp_reclen) {
1218                if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1219                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1220        }
1221}
1222
1223/*
1224 * Finds the request corresponding to the RPC xid and invokes the common
1225 * tcp read code to read the data.
1226 */
1227static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1228                                    struct xdr_skb_reader *desc)
1229{
1230        struct sock_xprt *transport =
1231                                container_of(xprt, struct sock_xprt, xprt);
1232        struct rpc_rqst *req;
1233
1234        dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1235
1236        /* Find and lock the request corresponding to this xid */
1237        spin_lock(&xprt->transport_lock);
1238        req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1239        if (!req) {
1240                dprintk("RPC:       XID %08x request not found!\n",
1241                                ntohl(transport->tcp_xid));
1242                spin_unlock(&xprt->transport_lock);
1243                return -1;
1244        }
1245
1246        xs_tcp_read_common(xprt, desc, req);
1247
1248        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1249                xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1250
1251        spin_unlock(&xprt->transport_lock);
1252        return 0;
1253}
1254
1255#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1256/*
1257 * Obtains an rpc_rqst previously allocated and invokes the common
1258 * tcp read code to read the data.  The result is placed in the callback
1259 * queue.
1260 * If we're unable to obtain the rpc_rqst we schedule the closing of the
1261 * connection and return -1.
1262 */
1263static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1264                                       struct xdr_skb_reader *desc)
1265{
1266        struct sock_xprt *transport =
1267                                container_of(xprt, struct sock_xprt, xprt);
1268        struct rpc_rqst *req;
1269
1270        req = xprt_alloc_bc_request(xprt);
1271        if (req == NULL) {
1272                printk(KERN_WARNING "Callback slot table overflowed\n");
1273                xprt_force_disconnect(xprt);
1274                return -1;
1275        }
1276
1277        req->rq_xid = transport->tcp_xid;
1278        dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1279        xs_tcp_read_common(xprt, desc, req);
1280
1281        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1282                struct svc_serv *bc_serv = xprt->bc_serv;
1283
1284                /*
1285                 * Add callback request to callback list.  The callback
1286                 * service sleeps on the sv_cb_waitq waiting for new
1287                 * requests.  Wake it up after adding enqueing the
1288                 * request.
1289                 */
1290                dprintk("RPC:       add callback request to list\n");
1291                spin_lock(&bc_serv->sv_cb_lock);
1292                list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1293                spin_unlock(&bc_serv->sv_cb_lock);
1294                wake_up(&bc_serv->sv_cb_waitq);
1295        }
1296
1297        req->rq_private_buf.len = transport->tcp_copied;
1298
1299        return 0;
1300}
1301
1302static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1303                                        struct xdr_skb_reader *desc)
1304{
1305        struct sock_xprt *transport =
1306                                container_of(xprt, struct sock_xprt, xprt);
1307
1308        return (transport->tcp_flags & TCP_RPC_REPLY) ?
1309                xs_tcp_read_reply(xprt, desc) :
1310                xs_tcp_read_callback(xprt, desc);
1311}
1312#else
1313static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1314                                        struct xdr_skb_reader *desc)
1315{
1316        return xs_tcp_read_reply(xprt, desc);
1317}
1318#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1319
1320/*
1321 * Read data off the transport.  This can be either an RPC_CALL or an
1322 * RPC_REPLY.  Relay the processing to helper functions.
1323 */
1324static void xs_tcp_read_data(struct rpc_xprt *xprt,
1325                                    struct xdr_skb_reader *desc)
1326{
1327        struct sock_xprt *transport =
1328                                container_of(xprt, struct sock_xprt, xprt);
1329
1330        if (_xs_tcp_read_data(xprt, desc) == 0)
1331                xs_tcp_check_fraghdr(transport);
1332        else {
1333                /*
1334                 * The transport_lock protects the request handling.
1335                 * There's no need to hold it to update the tcp_flags.
1336                 */
1337                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1338        }
1339}
1340
1341static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1342{
1343        size_t len;
1344
1345        len = transport->tcp_reclen - transport->tcp_offset;
1346        if (len > desc->count)
1347                len = desc->count;
1348        desc->count -= len;
1349        desc->offset += len;
1350        transport->tcp_offset += len;
1351        dprintk("RPC:       discarded %Zu bytes\n", len);
1352        xs_tcp_check_fraghdr(transport);
1353}
1354
1355static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1356{
1357        struct rpc_xprt *xprt = rd_desc->arg.data;
1358        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1359        struct xdr_skb_reader desc = {
1360                .skb    = skb,
1361                .offset = offset,
1362                .count  = len,
1363        };
1364
1365        dprintk("RPC:       xs_tcp_data_recv started\n");
1366        do {
1367                /* Read in a new fragment marker if necessary */
1368                /* Can we ever really expect to get completely empty fragments? */
1369                if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1370                        xs_tcp_read_fraghdr(xprt, &desc);
1371                        continue;
1372                }
1373                /* Read in the xid if necessary */
1374                if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1375                        xs_tcp_read_xid(transport, &desc);
1376                        continue;
1377                }
1378                /* Read in the call/reply flag */
1379                if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1380                        xs_tcp_read_calldir(transport, &desc);
1381                        continue;
1382                }
1383                /* Read in the request data */
1384                if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1385                        xs_tcp_read_data(xprt, &desc);
1386                        continue;
1387                }
1388                /* Skip over any trailing bytes on short reads */
1389                xs_tcp_read_discard(transport, &desc);
1390        } while (desc.count);
1391        dprintk("RPC:       xs_tcp_data_recv done\n");
1392        return len - desc.count;
1393}
1394
1395/**
1396 * xs_tcp_data_ready - "data ready" callback for TCP sockets
1397 * @sk: socket with data to read
1398 * @bytes: how much data to read
1399 *
1400 */
1401static void xs_tcp_data_ready(struct sock *sk, int bytes)
1402{
1403        struct rpc_xprt *xprt;
1404        read_descriptor_t rd_desc;
1405        int read;
1406
1407        dprintk("RPC:       xs_tcp_data_ready...\n");
1408
1409        read_lock_bh(&sk->sk_callback_lock);
1410        if (!(xprt = xprt_from_sock(sk)))
1411                goto out;
1412        if (xprt->shutdown)
1413                goto out;
1414
1415        /* Any data means we had a useful conversation, so
1416         * the we don't need to delay the next reconnect
1417         */
1418        if (xprt->reestablish_timeout)
1419                xprt->reestablish_timeout = 0;
1420
1421        /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1422        rd_desc.arg.data = xprt;
1423        do {
1424                rd_desc.count = 65536;
1425                read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1426        } while (read > 0);
1427out:
1428        read_unlock_bh(&sk->sk_callback_lock);
1429}
1430
1431/*
1432 * Do the equivalent of linger/linger2 handling for dealing with
1433 * broken servers that don't close the socket in a timely
1434 * fashion
1435 */
1436static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1437                unsigned long timeout)
1438{
1439        struct sock_xprt *transport;
1440
1441        if (xprt_test_and_set_connecting(xprt))
1442                return;
1443        set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1444        transport = container_of(xprt, struct sock_xprt, xprt);
1445        queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1446                           timeout);
1447}
1448
1449static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1450{
1451        struct sock_xprt *transport;
1452
1453        transport = container_of(xprt, struct sock_xprt, xprt);
1454
1455        if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1456            !cancel_delayed_work(&transport->connect_worker))
1457                return;
1458        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1459        xprt_clear_connecting(xprt);
1460}
1461
1462static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
1463{
1464        smp_mb__before_clear_bit();
1465        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1466        clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1467        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1468        clear_bit(XPRT_CLOSING, &xprt->state);
1469        smp_mb__after_clear_bit();
1470}
1471
1472static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1473{
1474        xs_sock_reset_connection_flags(xprt);
1475        /* Mark transport as closed and wake up all pending tasks */
1476        xprt_disconnect_done(xprt);
1477}
1478
1479/**
1480 * xs_tcp_state_change - callback to handle TCP socket state changes
1481 * @sk: socket whose state has changed
1482 *
1483 */
1484static void xs_tcp_state_change(struct sock *sk)
1485{
1486        struct rpc_xprt *xprt;
1487
1488        read_lock_bh(&sk->sk_callback_lock);
1489        if (!(xprt = xprt_from_sock(sk)))
1490                goto out;
1491        dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1492        dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
1493                        sk->sk_state, xprt_connected(xprt),
1494                        sock_flag(sk, SOCK_DEAD),
1495                        sock_flag(sk, SOCK_ZAPPED),
1496                        sk->sk_shutdown);
1497
1498        switch (sk->sk_state) {
1499        case TCP_ESTABLISHED:
1500                spin_lock(&xprt->transport_lock);
1501                if (!xprt_test_and_set_connected(xprt)) {
1502                        struct sock_xprt *transport = container_of(xprt,
1503                                        struct sock_xprt, xprt);
1504
1505                        /* Reset TCP record info */
1506                        transport->tcp_offset = 0;
1507                        transport->tcp_reclen = 0;
1508                        transport->tcp_copied = 0;
1509                        transport->tcp_flags =
1510                                TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1511
1512                        xprt_wake_pending_tasks(xprt, -EAGAIN);
1513                }
1514                spin_unlock(&xprt->transport_lock);
1515                break;
1516        case TCP_FIN_WAIT1:
1517                /* The client initiated a shutdown of the socket */
1518                xprt->connect_cookie++;
1519                xprt->reestablish_timeout = 0;
1520                set_bit(XPRT_CLOSING, &xprt->state);
1521                smp_mb__before_clear_bit();
1522                clear_bit(XPRT_CONNECTED, &xprt->state);
1523                clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1524                smp_mb__after_clear_bit();
1525                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1526                break;
1527        case TCP_CLOSE_WAIT:
1528                /* The server initiated a shutdown of the socket */
1529                xprt->connect_cookie++;
1530                clear_bit(XPRT_CONNECTED, &xprt->state);
1531                xs_tcp_force_close(xprt);
1532        case TCP_CLOSING:
1533                /*
1534                 * If the server closed down the connection, make sure that
1535                 * we back off before reconnecting
1536                 */
1537                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1538                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1539                break;
1540        case TCP_LAST_ACK:
1541                set_bit(XPRT_CLOSING, &xprt->state);
1542                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1543                smp_mb__before_clear_bit();
1544                clear_bit(XPRT_CONNECTED, &xprt->state);
1545                smp_mb__after_clear_bit();
1546                break;
1547        case TCP_CLOSE:
1548                xs_tcp_cancel_linger_timeout(xprt);
1549                xs_sock_mark_closed(xprt);
1550        }
1551 out:
1552        read_unlock_bh(&sk->sk_callback_lock);
1553}
1554
1555static void xs_write_space(struct sock *sk)
1556{
1557        struct socket *sock;
1558        struct rpc_xprt *xprt;
1559
1560        if (unlikely(!(sock = sk->sk_socket)))
1561                return;
1562        clear_bit(SOCK_NOSPACE, &sock->flags);
1563
1564        if (unlikely(!(xprt = xprt_from_sock(sk))))
1565                return;
1566        if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1567                return;
1568
1569        xprt_write_space(xprt);
1570}
1571
1572/**
1573 * xs_udp_write_space - callback invoked when socket buffer space
1574 *                             becomes available
1575 * @sk: socket whose state has changed
1576 *
1577 * Called when more output buffer space is available for this socket.
1578 * We try not to wake our writers until they can make "significant"
1579 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1580 * with a bunch of small requests.
1581 */
1582static void xs_udp_write_space(struct sock *sk)
1583{
1584        read_lock_bh(&sk->sk_callback_lock);
1585
1586        /* from net/core/sock.c:sock_def_write_space */
1587        if (sock_writeable(sk))
1588                xs_write_space(sk);
1589
1590        read_unlock_bh(&sk->sk_callback_lock);
1591}
1592
1593/**
1594 * xs_tcp_write_space - callback invoked when socket buffer space
1595 *                             becomes available
1596 * @sk: socket whose state has changed
1597 *
1598 * Called when more output buffer space is available for this socket.
1599 * We try not to wake our writers until they can make "significant"
1600 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1601 * with a bunch of small requests.
1602 */
1603static void xs_tcp_write_space(struct sock *sk)
1604{
1605        read_lock_bh(&sk->sk_callback_lock);
1606
1607        /* from net/core/stream.c:sk_stream_write_space */
1608        if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1609                xs_write_space(sk);
1610
1611        read_unlock_bh(&sk->sk_callback_lock);
1612}
1613
1614static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1615{
1616        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1617        struct sock *sk = transport->inet;
1618
1619        if (transport->rcvsize) {
1620                sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1621                sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1622        }
1623        if (transport->sndsize) {
1624                sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1625                sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1626                sk->sk_write_space(sk);
1627        }
1628}
1629
1630/**
1631 * xs_udp_set_buffer_size - set send and receive limits
1632 * @xprt: generic transport
1633 * @sndsize: requested size of send buffer, in bytes
1634 * @rcvsize: requested size of receive buffer, in bytes
1635 *
1636 * Set socket send and receive buffer size limits.
1637 */
1638static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1639{
1640        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1641
1642        transport->sndsize = 0;
1643        if (sndsize)
1644                transport->sndsize = sndsize + 1024;
1645        transport->rcvsize = 0;
1646        if (rcvsize)
1647                transport->rcvsize = rcvsize + 1024;
1648
1649        xs_udp_do_set_buffer_size(xprt);
1650}
1651
1652/**
1653 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1654 * @task: task that timed out
1655 *
1656 * Adjust the congestion window after a retransmit timeout has occurred.
1657 */
1658static void xs_udp_timer(struct rpc_task *task)
1659{
1660        xprt_adjust_cwnd(task, -ETIMEDOUT);
1661}
1662
1663static unsigned short xs_get_random_port(void)
1664{
1665        unsigned short range = xprt_max_resvport - xprt_min_resvport;
1666        unsigned short rand = (unsigned short) net_random() % range;
1667        return rand + xprt_min_resvport;
1668}
1669
1670/**
1671 * xs_set_port - reset the port number in the remote endpoint address
1672 * @xprt: generic transport
1673 * @port: new port number
1674 *
1675 */
1676static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1677{
1678        dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1679
1680        rpc_set_port(xs_addr(xprt), port);
1681        xs_update_peer_port(xprt);
1682}
1683
1684static unsigned short xs_get_srcport(struct sock_xprt *transport)
1685{
1686        unsigned short port = transport->srcport;
1687
1688        if (port == 0 && transport->xprt.resvport)
1689                port = xs_get_random_port();
1690        return port;
1691}
1692
1693static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1694{
1695        if (transport->srcport != 0)
1696                transport->srcport = 0;
1697        if (!transport->xprt.resvport)
1698                return 0;
1699        if (port <= xprt_min_resvport || port > xprt_max_resvport)
1700                return xprt_max_resvport;
1701        return --port;
1702}
1703static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1704{
1705        struct sockaddr_storage myaddr;
1706        int err, nloop = 0;
1707        unsigned short port = xs_get_srcport(transport);
1708        unsigned short last;
1709
1710        memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1711        do {
1712                rpc_set_port((struct sockaddr *)&myaddr, port);
1713                err = kernel_bind(sock, (struct sockaddr *)&myaddr,
1714                                transport->xprt.addrlen);
1715                if (port == 0)
1716                        break;
1717                if (err == 0) {
1718                        transport->srcport = port;
1719                        break;
1720                }
1721                last = port;
1722                port = xs_next_srcport(transport, port);
1723                if (port > last)
1724                        nloop++;
1725        } while (err == -EADDRINUSE && nloop != 2);
1726
1727        if (myaddr.ss_family == AF_INET)
1728                dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
1729                                &((struct sockaddr_in *)&myaddr)->sin_addr,
1730                                port, err ? "failed" : "ok", err);
1731        else
1732                dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
1733                                &((struct sockaddr_in6 *)&myaddr)->sin6_addr,
1734                                port, err ? "failed" : "ok", err);
1735        return err;
1736}
1737
1738/*
1739 * We don't support autobind on AF_LOCAL sockets
1740 */
1741static void xs_local_rpcbind(struct rpc_task *task)
1742{
1743        xprt_set_bound(task->tk_xprt);
1744}
1745
1746static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
1747{
1748}
1749
1750#ifdef CONFIG_DEBUG_LOCK_ALLOC
1751static struct lock_class_key xs_key[2];
1752static struct lock_class_key xs_slock_key[2];
1753
1754static inline void xs_reclassify_socketu(struct socket *sock)
1755{
1756        struct sock *sk = sock->sk;
1757
1758        BUG_ON(sock_owned_by_user(sk));
1759        sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
1760                &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]);
1761}
1762
1763static inline void xs_reclassify_socket4(struct socket *sock)
1764{
1765        struct sock *sk = sock->sk;
1766
1767        BUG_ON(sock_owned_by_user(sk));
1768        sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1769                &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1770}
1771
1772static inline void xs_reclassify_socket6(struct socket *sock)
1773{
1774        struct sock *sk = sock->sk;
1775
1776        BUG_ON(sock_owned_by_user(sk));
1777        sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1778                &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1779}
1780
1781static inline void xs_reclassify_socket(int family, struct socket *sock)
1782{
1783        switch (family) {
1784        case AF_LOCAL:
1785                xs_reclassify_socketu(sock);
1786                break;
1787        case AF_INET:
1788                xs_reclassify_socket4(sock);
1789                break;
1790        case AF_INET6:
1791                xs_reclassify_socket6(sock);
1792                break;
1793        }
1794}
1795#else
1796static inline void xs_reclassify_socketu(struct socket *sock)
1797{
1798}
1799
1800static inline void xs_reclassify_socket4(struct socket *sock)
1801{
1802}
1803
1804static inline void xs_reclassify_socket6(struct socket *sock)
1805{
1806}
1807
1808static inline void xs_reclassify_socket(int family, struct socket *sock)
1809{
1810}
1811#endif
1812
1813static struct socket *xs_create_sock(struct rpc_xprt *xprt,
1814                struct sock_xprt *transport, int family, int type, int protocol)
1815{
1816        struct socket *sock;
1817        int err;
1818
1819        err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1);
1820        if (err < 0) {
1821                dprintk("RPC:       can't create %d transport socket (%d).\n",
1822                                protocol, -err);
1823                goto out;
1824        }
1825        xs_reclassify_socket(family, sock);
1826
1827        err = xs_bind(transport, sock);
1828        if (err) {
1829                sock_release(sock);
1830                goto out;
1831        }
1832
1833        return sock;
1834out:
1835        return ERR_PTR(err);
1836}
1837
1838static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1839                                      struct socket *sock)
1840{
1841        struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1842                                                                        xprt);
1843
1844        if (!transport->inet) {
1845                struct sock *sk = sock->sk;
1846
1847                write_lock_bh(&sk->sk_callback_lock);
1848
1849                xs_save_old_callbacks(transport, sk);
1850
1851                sk->sk_user_data = xprt;
1852                sk->sk_data_ready = xs_local_data_ready;
1853                sk->sk_write_space = xs_udp_write_space;
1854                sk->sk_allocation = GFP_ATOMIC;
1855
1856                xprt_clear_connected(xprt);
1857
1858                /* Reset to new socket */
1859                transport->sock = sock;
1860                transport->inet = sk;
1861
1862                write_unlock_bh(&sk->sk_callback_lock);
1863        }
1864
1865        /* Tell the socket layer to start connecting... */
1866        xprt->stat.connect_count++;
1867        xprt->stat.connect_start = jiffies;
1868        return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1869}
1870
1871/**
1872 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
1873 * @xprt: RPC transport to connect
1874 * @transport: socket transport to connect
1875 * @create_sock: function to create a socket of the correct type
1876 *
1877 * Invoked by a work queue tasklet.
1878 */
1879static void xs_local_setup_socket(struct work_struct *work)
1880{
1881        struct sock_xprt *transport =
1882                container_of(work, struct sock_xprt, connect_worker.work);
1883        struct rpc_xprt *xprt = &transport->xprt;
1884        struct socket *sock;
1885        int status = -EIO;
1886
1887        if (xprt->shutdown)
1888                goto out;
1889
1890        current->flags |= PF_FSTRANS;
1891
1892        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1893        status = __sock_create(xprt->xprt_net, AF_LOCAL,
1894                                        SOCK_STREAM, 0, &sock, 1);
1895        if (status < 0) {
1896                dprintk("RPC:       can't create AF_LOCAL "
1897                        "transport socket (%d).\n", -status);
1898                goto out;
1899        }
1900        xs_reclassify_socketu(sock);
1901
1902        dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
1903                        xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1904
1905        status = xs_local_finish_connecting(xprt, sock);
1906        switch (status) {
1907        case 0:
1908                dprintk("RPC:       xprt %p connected to %s\n",
1909                                xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1910                xprt_set_connected(xprt);
1911                break;
1912        case -ENOENT:
1913                dprintk("RPC:       xprt %p: socket %s does not exist\n",
1914                                xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1915                break;
1916        default:
1917                printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
1918                                __func__, -status,
1919                                xprt->address_strings[RPC_DISPLAY_ADDR]);
1920        }
1921
1922out:
1923        xprt_clear_connecting(xprt);
1924        xprt_wake_pending_tasks(xprt, status);
1925        current->flags &= ~PF_FSTRANS;
1926}
1927
1928#ifdef CONFIG_SUNRPC_SWAP
1929static void xs_set_memalloc(struct rpc_xprt *xprt)
1930{
1931        struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1932                        xprt);
1933
1934        if (xprt->swapper)
1935                sk_set_memalloc(transport->inet);
1936}
1937
1938/**
1939 * xs_swapper - Tag this transport as being used for swap.
1940 * @xprt: transport to tag
1941 * @enable: enable/disable
1942 *
1943 */
1944int xs_swapper(struct rpc_xprt *xprt, int enable)
1945{
1946        struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1947                        xprt);
1948        int err = 0;
1949
1950        if (enable) {
1951                xprt->swapper++;
1952                xs_set_memalloc(xprt);
1953        } else if (xprt->swapper) {
1954                xprt->swapper--;
1955                sk_clear_memalloc(transport->inet);
1956        }
1957
1958        return err;
1959}
1960EXPORT_SYMBOL_GPL(xs_swapper);
1961#else
1962static void xs_set_memalloc(struct rpc_xprt *xprt)
1963{
1964}
1965#endif
1966
1967static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1968{
1969        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1970
1971        if (!transport->inet) {
1972                struct sock *sk = sock->sk;
1973
1974                write_lock_bh(&sk->sk_callback_lock);
1975
1976                xs_save_old_callbacks(transport, sk);
1977
1978                sk->sk_user_data = xprt;
1979                sk->sk_data_ready = xs_udp_data_ready;
1980                sk->sk_write_space = xs_udp_write_space;
1981                sk->sk_no_check = UDP_CSUM_NORCV;
1982                sk->sk_allocation = GFP_ATOMIC;
1983
1984                xprt_set_connected(xprt);
1985
1986                /* Reset to new socket */
1987                transport->sock = sock;
1988                transport->inet = sk;
1989
1990                xs_set_memalloc(xprt);
1991
1992                write_unlock_bh(&sk->sk_callback_lock);
1993        }
1994        xs_udp_do_set_buffer_size(xprt);
1995}
1996
1997static void xs_udp_setup_socket(struct work_struct *work)
1998{
1999        struct sock_xprt *transport =
2000                container_of(work, struct sock_xprt, connect_worker.work);
2001        struct rpc_xprt *xprt = &transport->xprt;
2002        struct socket *sock = transport->sock;
2003        int status = -EIO;
2004
2005        if (xprt->shutdown)
2006                goto out;
2007
2008        current->flags |= PF_FSTRANS;
2009
2010        /* Start by resetting any existing state */
2011        xs_reset_transport(transport);
2012        sock = xs_create_sock(xprt, transport,
2013                        xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
2014        if (IS_ERR(sock))
2015                goto out;
2016
2017        dprintk("RPC:       worker connecting xprt %p via %s to "
2018                                "%s (port %s)\n", xprt,
2019                        xprt->address_strings[RPC_DISPLAY_PROTO],
2020                        xprt->address_strings[RPC_DISPLAY_ADDR],
2021                        xprt->address_strings[RPC_DISPLAY_PORT]);
2022
2023        xs_udp_finish_connecting(xprt, sock);
2024        status = 0;
2025out:
2026        xprt_clear_connecting(xprt);
2027        xprt_wake_pending_tasks(xprt, status);
2028        current->flags &= ~PF_FSTRANS;
2029}
2030
2031/*
2032 * We need to preserve the port number so the reply cache on the server can
2033 * find our cached RPC replies when we get around to reconnecting.
2034 */
2035static void xs_abort_connection(struct sock_xprt *transport)
2036{
2037        int result;
2038        struct sockaddr any;
2039
2040        dprintk("RPC:       disconnecting xprt %p to reuse port\n", transport);
2041
2042        /*
2043         * Disconnect the transport socket by doing a connect operation
2044         * with AF_UNSPEC.  This should return immediately...
2045         */
2046        memset(&any, 0, sizeof(any));
2047        any.sa_family = AF_UNSPEC;
2048        result = kernel_connect(transport->sock, &any, sizeof(any), 0);
2049        if (!result)
2050                xs_sock_reset_connection_flags(&transport->xprt);
2051        dprintk("RPC:       AF_UNSPEC connect return code %d\n", result);
2052}
2053
2054static void xs_tcp_reuse_connection(struct sock_xprt *transport)
2055{
2056        unsigned int state = transport->inet->sk_state;
2057
2058        if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) {
2059                /* we don't need to abort the connection if the socket
2060                 * hasn't undergone a shutdown
2061                 */
2062                if (transport->inet->sk_shutdown == 0)
2063                        return;
2064                dprintk("RPC:       %s: TCP_CLOSEd and sk_shutdown set to %d\n",
2065                                __func__, transport->inet->sk_shutdown);
2066        }
2067        if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) {
2068                /* we don't need to abort the connection if the socket
2069                 * hasn't undergone a shutdown
2070                 */
2071                if (transport->inet->sk_shutdown == 0)
2072                        return;
2073                dprintk("RPC:       %s: ESTABLISHED/SYN_SENT "
2074                                "sk_shutdown set to %d\n",
2075                                __func__, transport->inet->sk_shutdown);
2076        }
2077        xs_abort_connection(transport);
2078}
2079
2080static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2081{
2082        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2083        int ret = -ENOTCONN;
2084
2085        if (!transport->inet) {
2086                struct sock *sk = sock->sk;
2087
2088                write_lock_bh(&sk->sk_callback_lock);
2089
2090                xs_save_old_callbacks(transport, sk);
2091
2092                sk->sk_user_data = xprt;
2093                sk->sk_data_ready = xs_tcp_data_ready;
2094                sk->sk_state_change = xs_tcp_state_change;
2095                sk->sk_write_space = xs_tcp_write_space;
2096                sk->sk_allocation = GFP_ATOMIC;
2097
2098                /* socket options */
2099                sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
2100                sock_reset_flag(sk, SOCK_LINGER);
2101                tcp_sk(sk)->linger2 = 0;
2102                tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
2103
2104                xprt_clear_connected(xprt);
2105
2106                /* Reset to new socket */
2107                transport->sock = sock;
2108                transport->inet = sk;
2109
2110                write_unlock_bh(&sk->sk_callback_lock);
2111        }
2112
2113        if (!xprt_bound(xprt))
2114                goto out;
2115
2116        xs_set_memalloc(xprt);
2117
2118        /* Tell the socket layer to start connecting... */
2119        xprt->stat.connect_count++;
2120        xprt->stat.connect_start = jiffies;
2121        ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
2122        switch (ret) {
2123        case 0:
2124        case -EINPROGRESS:
2125                /* SYN_SENT! */
2126                xprt->connect_cookie++;
2127                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2128                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2129        }
2130out:
2131        return ret;
2132}
2133
2134/**
2135 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
2136 * @xprt: RPC transport to connect
2137 * @transport: socket transport to connect
2138 * @create_sock: function to create a socket of the correct type
2139 *
2140 * Invoked by a work queue tasklet.
2141 */
2142static void xs_tcp_setup_socket(struct work_struct *work)
2143{
2144        struct sock_xprt *transport =
2145                container_of(work, struct sock_xprt, connect_worker.work);
2146        struct socket *sock = transport->sock;
2147        struct rpc_xprt *xprt = &transport->xprt;
2148        int status = -EIO;
2149
2150        if (xprt->shutdown)
2151                goto out;
2152
2153        current->flags |= PF_FSTRANS;
2154
2155        if (!sock) {
2156                clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
2157                sock = xs_create_sock(xprt, transport,
2158                                xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP);
2159                if (IS_ERR(sock)) {
2160                        status = PTR_ERR(sock);
2161                        goto out;
2162                }
2163        } else {
2164                int abort_and_exit;
2165
2166                abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
2167                                &xprt->state);
2168                /* "close" the socket, preserving the local port */
2169                xs_tcp_reuse_connection(transport);
2170
2171                if (abort_and_exit)
2172                        goto out_eagain;
2173        }
2174
2175        dprintk("RPC:       worker connecting xprt %p via %s to "
2176                                "%s (port %s)\n", xprt,
2177                        xprt->address_strings[RPC_DISPLAY_PROTO],
2178                        xprt->address_strings[RPC_DISPLAY_ADDR],
2179                        xprt->address_strings[RPC_DISPLAY_PORT]);
2180
2181        status = xs_tcp_finish_connecting(xprt, sock);
2182        dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
2183                        xprt, -status, xprt_connected(xprt),
2184                        sock->sk->sk_state);
2185        switch (status) {
2186        default:
2187                printk("%s: connect returned unhandled error %d\n",
2188                        __func__, status);
2189        case -EADDRNOTAVAIL:
2190                /* We're probably in TIME_WAIT. Get rid of existing socket,
2191                 * and retry
2192                 */
2193                xs_tcp_force_close(xprt);
2194                break;
2195        case -ECONNREFUSED:
2196        case -ECONNRESET:
2197        case -ENETUNREACH:
2198                /* retry with existing socket, after a delay */
2199        case 0:
2200        case -EINPROGRESS:
2201        case -EALREADY:
2202                xprt_clear_connecting(xprt);
2203                current->flags &= ~PF_FSTRANS;
2204                return;
2205        case -EINVAL:
2206                /* Happens, for instance, if the user specified a link
2207                 * local IPv6 address without a scope-id.
2208                 */
2209                goto out;
2210        }
2211out_eagain:
2212        status = -EAGAIN;
2213out:
2214        xprt_clear_connecting(xprt);
2215        xprt_wake_pending_tasks(xprt, status);
2216        current->flags &= ~PF_FSTRANS;
2217}
2218
2219/**
2220 * xs_connect - connect a socket to a remote endpoint
2221 * @task: address of RPC task that manages state of connect request
2222 *
2223 * TCP: If the remote end dropped the connection, delay reconnecting.
2224 *
2225 * UDP socket connects are synchronous, but we use a work queue anyway
2226 * to guarantee that even unprivileged user processes can set up a
2227 * socket on a privileged port.
2228 *
2229 * If a UDP socket connect fails, the delay behavior here prevents
2230 * retry floods (hard mounts).
2231 */
2232static void xs_connect(struct rpc_task *task)
2233{
2234        struct rpc_xprt *xprt = task->tk_xprt;
2235        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2236
2237        if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
2238                dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2239                                "seconds\n",
2240                                xprt, xprt->reestablish_timeout / HZ);
2241                queue_delayed_work(rpciod_workqueue,
2242                                   &transport->connect_worker,
2243                                   xprt->reestablish_timeout);
2244                xprt->reestablish_timeout <<= 1;
2245                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2246                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2247                if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2248                        xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2249        } else {
2250                dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2251                queue_delayed_work(rpciod_workqueue,
2252                                   &transport->connect_worker, 0);
2253        }
2254}
2255
2256/**
2257 * xs_local_print_stats - display AF_LOCAL socket-specifc stats
2258 * @xprt: rpc_xprt struct containing statistics
2259 * @seq: output file
2260 *
2261 */
2262static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2263{
2264        long idle_time = 0;
2265
2266        if (xprt_connected(xprt))
2267                idle_time = (long)(jiffies - xprt->last_used) / HZ;
2268
2269        seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
2270                        "%llu %llu %lu %llu %llu\n",
2271                        xprt->stat.bind_count,
2272                        xprt->stat.connect_count,
2273                        xprt->stat.connect_time,
2274                        idle_time,
2275                        xprt->stat.sends,
2276                        xprt->stat.recvs,
2277                        xprt->stat.bad_xids,
2278                        xprt->stat.req_u,
2279                        xprt->stat.bklog_u,
2280                        xprt->stat.max_slots,
2281                        xprt->stat.sending_u,
2282                        xprt->stat.pending_u);
2283}
2284
2285/**
2286 * xs_udp_print_stats - display UDP socket-specifc stats
2287 * @xprt: rpc_xprt struct containing statistics
2288 * @seq: output file
2289 *
2290 */
2291static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2292{
2293        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2294
2295        seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %llu %llu "
2296                        "%lu %llu %llu\n",
2297                        transport->srcport,
2298                        xprt->stat.bind_count,
2299                        xprt->stat.sends,
2300                        xprt->stat.recvs,
2301                        xprt->stat.bad_xids,
2302                        xprt->stat.req_u,
2303                        xprt->stat.bklog_u,
2304                        xprt->stat.max_slots,
2305                        xprt->stat.sending_u,
2306                        xprt->stat.pending_u);
2307}
2308
2309/**
2310 * xs_tcp_print_stats - display TCP socket-specifc stats
2311 * @xprt: rpc_xprt struct containing statistics
2312 * @seq: output file
2313 *
2314 */
2315static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2316{
2317        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2318        long idle_time = 0;
2319
2320        if (xprt_connected(xprt))
2321                idle_time = (long)(jiffies - xprt->last_used) / HZ;
2322
2323        seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu "
2324                        "%llu %llu %lu %llu %llu\n",
2325                        transport->srcport,
2326                        xprt->stat.bind_count,
2327                        xprt->stat.connect_count,
2328                        xprt->stat.connect_time,
2329                        idle_time,
2330                        xprt->stat.sends,
2331                        xprt->stat.recvs,
2332                        xprt->stat.bad_xids,
2333                        xprt->stat.req_u,
2334                        xprt->stat.bklog_u,
2335                        xprt->stat.max_slots,
2336                        xprt->stat.sending_u,
2337                        xprt->stat.pending_u);
2338}
2339
2340/*
2341 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2342 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2343 * to use the server side send routines.
2344 */
2345static void *bc_malloc(struct rpc_task *task, size_t size)
2346{
2347        struct page *page;
2348        struct rpc_buffer *buf;
2349
2350        BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2351        page = alloc_page(GFP_KERNEL);
2352
2353        if (!page)
2354                return NULL;
2355
2356        buf = page_address(page);
2357        buf->len = PAGE_SIZE;
2358
2359        return buf->data;
2360}
2361
2362/*
2363 * Free the space allocated in the bc_alloc routine
2364 */
2365static void bc_free(void *buffer)
2366{
2367        struct rpc_buffer *buf;
2368
2369        if (!buffer)
2370                return;
2371
2372        buf = container_of(buffer, struct rpc_buffer, data);
2373        free_page((unsigned long)buf);
2374}
2375
2376/*
2377 * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2378 * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2379 */
2380static int bc_sendto(struct rpc_rqst *req)
2381{
2382        int len;
2383        struct xdr_buf *xbufp = &req->rq_snd_buf;
2384        struct rpc_xprt *xprt = req->rq_xprt;
2385        struct sock_xprt *transport =
2386                                container_of(xprt, struct sock_xprt, xprt);
2387        struct socket *sock = transport->sock;
2388        unsigned long headoff;
2389        unsigned long tailoff;
2390
2391        xs_encode_stream_record_marker(xbufp);
2392
2393        tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2394        headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2395        len = svc_send_common(sock, xbufp,
2396                              virt_to_page(xbufp->head[0].iov_base), headoff,
2397                              xbufp->tail[0].iov_base, tailoff);
2398
2399        if (len != xbufp->len) {
2400                printk(KERN_NOTICE "Error sending entire callback!\n");
2401                len = -EAGAIN;
2402        }
2403
2404        return len;
2405}
2406
2407/*
2408 * The send routine. Borrows from svc_send
2409 */
2410static int bc_send_request(struct rpc_task *task)
2411{
2412        struct rpc_rqst *req = task->tk_rqstp;
2413        struct svc_xprt *xprt;
2414        struct svc_sock         *svsk;
2415        u32                     len;
2416
2417        dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2418        /*
2419         * Get the server socket associated with this callback xprt
2420         */
2421        xprt = req->rq_xprt->bc_xprt;
2422        svsk = container_of(xprt, struct svc_sock, sk_xprt);
2423
2424        /*
2425         * Grab the mutex to serialize data as the connection is shared
2426         * with the fore channel
2427         */
2428        if (!mutex_trylock(&xprt->xpt_mutex)) {
2429                rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2430                if (!mutex_trylock(&xprt->xpt_mutex))
2431                        return -EAGAIN;
2432                rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2433        }
2434        if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2435                len = -ENOTCONN;
2436        else
2437                len = bc_sendto(req);
2438        mutex_unlock(&xprt->xpt_mutex);
2439
2440        if (len > 0)
2441                len = 0;
2442
2443        return len;
2444}
2445
2446/*
2447 * The close routine. Since this is client initiated, we do nothing
2448 */
2449
2450static void bc_close(struct rpc_xprt *xprt)
2451{
2452}
2453
2454/*
2455 * The xprt destroy routine. Again, because this connection is client
2456 * initiated, we do nothing
2457 */
2458
2459static void bc_destroy(struct rpc_xprt *xprt)
2460{
2461}
2462
2463static struct rpc_xprt_ops xs_local_ops = {
2464        .reserve_xprt           = xprt_reserve_xprt,
2465        .release_xprt           = xs_tcp_release_xprt,
2466        .alloc_slot             = xprt_alloc_slot,
2467        .rpcbind                = xs_local_rpcbind,
2468        .set_port               = xs_local_set_port,
2469        .connect                = xs_connect,
2470        .buf_alloc              = rpc_malloc,
2471        .buf_free               = rpc_free,
2472        .send_request           = xs_local_send_request,
2473        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2474        .close                  = xs_close,
2475        .destroy                = xs_destroy,
2476        .print_stats            = xs_local_print_stats,
2477};
2478
2479static struct rpc_xprt_ops xs_udp_ops = {
2480        .set_buffer_size        = xs_udp_set_buffer_size,
2481        .reserve_xprt           = xprt_reserve_xprt_cong,
2482        .release_xprt           = xprt_release_xprt_cong,
2483        .alloc_slot             = xprt_alloc_slot,
2484        .rpcbind                = rpcb_getport_async,
2485        .set_port               = xs_set_port,
2486        .connect                = xs_connect,
2487        .buf_alloc              = rpc_malloc,
2488        .buf_free               = rpc_free,
2489        .send_request           = xs_udp_send_request,
2490        .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2491        .timer                  = xs_udp_timer,
2492        .release_request        = xprt_release_rqst_cong,
2493        .close                  = xs_close,
2494        .destroy                = xs_destroy,
2495        .print_stats            = xs_udp_print_stats,
2496};
2497
2498static struct rpc_xprt_ops xs_tcp_ops = {
2499        .reserve_xprt           = xprt_reserve_xprt,
2500        .release_xprt           = xs_tcp_release_xprt,
2501        .alloc_slot             = xprt_lock_and_alloc_slot,
2502        .rpcbind                = rpcb_getport_async,
2503        .set_port               = xs_set_port,
2504        .connect                = xs_connect,
2505        .buf_alloc              = rpc_malloc,
2506        .buf_free               = rpc_free,
2507        .send_request           = xs_tcp_send_request,
2508        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2509        .close                  = xs_tcp_close,
2510        .destroy                = xs_destroy,
2511        .print_stats            = xs_tcp_print_stats,
2512};
2513
2514/*
2515 * The rpc_xprt_ops for the server backchannel
2516 */
2517
2518static struct rpc_xprt_ops bc_tcp_ops = {
2519        .reserve_xprt           = xprt_reserve_xprt,
2520        .release_xprt           = xprt_release_xprt,
2521        .alloc_slot             = xprt_alloc_slot,
2522        .rpcbind                = xs_local_rpcbind,
2523        .buf_alloc              = bc_malloc,
2524        .buf_free               = bc_free,
2525        .send_request           = bc_send_request,
2526        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2527        .close                  = bc_close,
2528        .destroy                = bc_destroy,
2529        .print_stats            = xs_tcp_print_stats,
2530};
2531
2532static int xs_init_anyaddr(const int family, struct sockaddr *sap)
2533{
2534        static const struct sockaddr_in sin = {
2535                .sin_family             = AF_INET,
2536                .sin_addr.s_addr        = htonl(INADDR_ANY),
2537        };
2538        static const struct sockaddr_in6 sin6 = {
2539                .sin6_family            = AF_INET6,
2540                .sin6_addr              = IN6ADDR_ANY_INIT,
2541        };
2542
2543        switch (family) {
2544        case AF_LOCAL:
2545                break;
2546        case AF_INET:
2547                memcpy(sap, &sin, sizeof(sin));
2548                break;
2549        case AF_INET6:
2550                memcpy(sap, &sin6, sizeof(sin6));
2551                break;
2552        default:
2553                dprintk("RPC:       %s: Bad address family\n", __func__);
2554                return -EAFNOSUPPORT;
2555        }
2556        return 0;
2557}
2558
2559static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2560                                      unsigned int slot_table_size,
2561                                      unsigned int max_slot_table_size)
2562{
2563        struct rpc_xprt *xprt;
2564        struct sock_xprt *new;
2565
2566        if (args->addrlen > sizeof(xprt->addr)) {
2567                dprintk("RPC:       xs_setup_xprt: address too large\n");
2568                return ERR_PTR(-EBADF);
2569        }
2570
2571        xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
2572                        max_slot_table_size);
2573        if (xprt == NULL) {
2574                dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2575                                "rpc_xprt\n");
2576                return ERR_PTR(-ENOMEM);
2577        }
2578
2579        new = container_of(xprt, struct sock_xprt, xprt);
2580        memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2581        xprt->addrlen = args->addrlen;
2582        if (args->srcaddr)
2583                memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2584        else {
2585                int err;
2586                err = xs_init_anyaddr(args->dstaddr->sa_family,
2587                                        (struct sockaddr *)&new->srcaddr);
2588                if (err != 0) {
2589                        xprt_free(xprt);
2590                        return ERR_PTR(err);
2591                }
2592        }
2593
2594        return xprt;
2595}
2596
2597static const struct rpc_timeout xs_local_default_timeout = {
2598        .to_initval = 10 * HZ,
2599        .to_maxval = 10 * HZ,
2600        .to_retries = 2,
2601};
2602
2603/**
2604 * xs_setup_local - Set up transport to use an AF_LOCAL socket
2605 * @args: rpc transport creation arguments
2606 *
2607 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2608 */
2609static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2610{
2611        struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
2612        struct sock_xprt *transport;
2613        struct rpc_xprt *xprt;
2614        struct rpc_xprt *ret;
2615
2616        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2617                        xprt_max_tcp_slot_table_entries);
2618        if (IS_ERR(xprt))
2619                return xprt;
2620        transport = container_of(xprt, struct sock_xprt, xprt);
2621
2622        xprt->prot = 0;
2623        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2624        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2625
2626        xprt->bind_timeout = XS_BIND_TO;
2627        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2628        xprt->idle_timeout = XS_IDLE_DISC_TO;
2629
2630        xprt->ops = &xs_local_ops;
2631        xprt->timeout = &xs_local_default_timeout;
2632
2633        switch (sun->sun_family) {
2634        case AF_LOCAL:
2635                if (sun->sun_path[0] != '/') {
2636                        dprintk("RPC:       bad AF_LOCAL address: %s\n",
2637                                        sun->sun_path);
2638                        ret = ERR_PTR(-EINVAL);
2639                        goto out_err;
2640                }
2641                xprt_set_bound(xprt);
2642                INIT_DELAYED_WORK(&transport->connect_worker,
2643                                        xs_local_setup_socket);
2644                xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
2645                break;
2646        default:
2647                ret = ERR_PTR(-EAFNOSUPPORT);
2648                goto out_err;
2649        }
2650
2651        dprintk("RPC:       set up xprt to %s via AF_LOCAL\n",
2652                        xprt->address_strings[RPC_DISPLAY_ADDR]);
2653
2654        if (try_module_get(THIS_MODULE))
2655                return xprt;
2656        ret = ERR_PTR(-EINVAL);
2657out_err:
2658        xprt_free(xprt);
2659        return ret;
2660}
2661
2662static const struct rpc_timeout xs_udp_default_timeout = {
2663        .to_initval = 5 * HZ,
2664        .to_maxval = 30 * HZ,
2665        .to_increment = 5 * HZ,
2666        .to_retries = 5,
2667};
2668
2669/**
2670 * xs_setup_udp - Set up transport to use a UDP socket
2671 * @args: rpc transport creation arguments
2672 *
2673 */
2674static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2675{
2676        struct sockaddr *addr = args->dstaddr;
2677        struct rpc_xprt *xprt;
2678        struct sock_xprt *transport;
2679        struct rpc_xprt *ret;
2680
2681        xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
2682                        xprt_udp_slot_table_entries);
2683        if (IS_ERR(xprt))
2684                return xprt;
2685        transport = container_of(xprt, struct sock_xprt, xprt);
2686
2687        xprt->prot = IPPROTO_UDP;
2688        xprt->tsh_size = 0;
2689        /* XXX: header size can vary due to auth type, IPv6, etc. */
2690        xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2691
2692        xprt->bind_timeout = XS_BIND_TO;
2693        xprt->reestablish_timeout = XS_UDP_REEST_TO;
2694        xprt->idle_timeout = XS_IDLE_DISC_TO;
2695
2696        xprt->ops = &xs_udp_ops;
2697
2698        xprt->timeout = &xs_udp_default_timeout;
2699
2700        switch (addr->sa_family) {
2701        case AF_INET:
2702                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2703                        xprt_set_bound(xprt);
2704
2705                INIT_DELAYED_WORK(&transport->connect_worker,
2706                                        xs_udp_setup_socket);
2707                xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2708                break;
2709        case AF_INET6:
2710                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2711                        xprt_set_bound(xprt);
2712
2713                INIT_DELAYED_WORK(&transport->connect_worker,
2714                                        xs_udp_setup_socket);
2715                xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2716                break;
2717        default:
2718                ret = ERR_PTR(-EAFNOSUPPORT);
2719                goto out_err;
2720        }
2721
2722        if (xprt_bound(xprt))
2723                dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2724                                xprt->address_strings[RPC_DISPLAY_ADDR],
2725                                xprt->address_strings[RPC_DISPLAY_PORT],
2726                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2727        else
2728                dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2729                                xprt->address_strings[RPC_DISPLAY_ADDR],
2730                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2731
2732        if (try_module_get(THIS_MODULE))
2733                return xprt;
2734        ret = ERR_PTR(-EINVAL);
2735out_err:
2736        xprt_free(xprt);
2737        return ret;
2738}
2739
2740static const struct rpc_timeout xs_tcp_default_timeout = {
2741        .to_initval = 60 * HZ,
2742        .to_maxval = 60 * HZ,
2743        .to_retries = 2,
2744};
2745
2746/**
2747 * xs_setup_tcp - Set up transport to use a TCP socket
2748 * @args: rpc transport creation arguments
2749 *
2750 */
2751static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2752{
2753        struct sockaddr *addr = args->dstaddr;
2754        struct rpc_xprt *xprt;
2755        struct sock_xprt *transport;
2756        struct rpc_xprt *ret;
2757
2758        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2759                        xprt_max_tcp_slot_table_entries);
2760        if (IS_ERR(xprt))
2761                return xprt;
2762        transport = container_of(xprt, struct sock_xprt, xprt);
2763
2764        xprt->prot = IPPROTO_TCP;
2765        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2766        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2767
2768        xprt->bind_timeout = XS_BIND_TO;
2769        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2770        xprt->idle_timeout = XS_IDLE_DISC_TO;
2771
2772        xprt->ops = &xs_tcp_ops;
2773        xprt->timeout = &xs_tcp_default_timeout;
2774
2775        switch (addr->sa_family) {
2776        case AF_INET:
2777                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2778                        xprt_set_bound(xprt);
2779
2780                INIT_DELAYED_WORK(&transport->connect_worker,
2781                                        xs_tcp_setup_socket);
2782                xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2783                break;
2784        case AF_INET6:
2785                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2786                        xprt_set_bound(xprt);
2787
2788                INIT_DELAYED_WORK(&transport->connect_worker,
2789                                        xs_tcp_setup_socket);
2790                xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2791                break;
2792        default:
2793                ret = ERR_PTR(-EAFNOSUPPORT);
2794                goto out_err;
2795        }
2796
2797        if (xprt_bound(xprt))
2798                dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2799                                xprt->address_strings[RPC_DISPLAY_ADDR],
2800                                xprt->address_strings[RPC_DISPLAY_PORT],
2801                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2802        else
2803                dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2804                                xprt->address_strings[RPC_DISPLAY_ADDR],
2805                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2806
2807
2808        if (try_module_get(THIS_MODULE))
2809                return xprt;
2810        ret = ERR_PTR(-EINVAL);
2811out_err:
2812        xprt_free(xprt);
2813        return ret;
2814}
2815
2816/**
2817 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2818 * @args: rpc transport creation arguments
2819 *
2820 */
2821static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2822{
2823        struct sockaddr *addr = args->dstaddr;
2824        struct rpc_xprt *xprt;
2825        struct sock_xprt *transport;
2826        struct svc_sock *bc_sock;
2827        struct rpc_xprt *ret;
2828
2829        if (args->bc_xprt->xpt_bc_xprt) {
2830                /*
2831                 * This server connection already has a backchannel
2832                 * export; we can't create a new one, as we wouldn't be
2833                 * able to match replies based on xid any more.  So,
2834                 * reuse the already-existing one:
2835                 */
2836                 return args->bc_xprt->xpt_bc_xprt;
2837        }
2838        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2839                        xprt_tcp_slot_table_entries);
2840        if (IS_ERR(xprt))
2841                return xprt;
2842        transport = container_of(xprt, struct sock_xprt, xprt);
2843
2844        xprt->prot = IPPROTO_TCP;
2845        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2846        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2847        xprt->timeout = &xs_tcp_default_timeout;
2848
2849        /* backchannel */
2850        xprt_set_bound(xprt);
2851        xprt->bind_timeout = 0;
2852        xprt->reestablish_timeout = 0;
2853        xprt->idle_timeout = 0;
2854
2855        xprt->ops = &bc_tcp_ops;
2856
2857        switch (addr->sa_family) {
2858        case AF_INET:
2859                xs_format_peer_addresses(xprt, "tcp",
2860                                         RPCBIND_NETID_TCP);
2861                break;
2862        case AF_INET6:
2863                xs_format_peer_addresses(xprt, "tcp",
2864                                   RPCBIND_NETID_TCP6);
2865                break;
2866        default:
2867                ret = ERR_PTR(-EAFNOSUPPORT);
2868                goto out_err;
2869        }
2870
2871        dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2872                        xprt->address_strings[RPC_DISPLAY_ADDR],
2873                        xprt->address_strings[RPC_DISPLAY_PORT],
2874                        xprt->address_strings[RPC_DISPLAY_PROTO]);
2875
2876        /*
2877         * Once we've associated a backchannel xprt with a connection,
2878         * we want to keep it around as long as long as the connection
2879         * lasts, in case we need to start using it for a backchannel
2880         * again; this reference won't be dropped until bc_xprt is
2881         * destroyed.
2882         */
2883        xprt_get(xprt);
2884        args->bc_xprt->xpt_bc_xprt = xprt;
2885        xprt->bc_xprt = args->bc_xprt;
2886        bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2887        transport->sock = bc_sock->sk_sock;
2888        transport->inet = bc_sock->sk_sk;
2889
2890        /*
2891         * Since we don't want connections for the backchannel, we set
2892         * the xprt status to connected
2893         */
2894        xprt_set_connected(xprt);
2895
2896
2897        if (try_module_get(THIS_MODULE))
2898                return xprt;
2899        xprt_put(xprt);
2900        ret = ERR_PTR(-EINVAL);
2901out_err:
2902        xprt_free(xprt);
2903        return ret;
2904}
2905
2906static struct xprt_class        xs_local_transport = {
2907        .list           = LIST_HEAD_INIT(xs_local_transport.list),
2908        .name           = "named UNIX socket",
2909        .owner          = THIS_MODULE,
2910        .ident          = XPRT_TRANSPORT_LOCAL,
2911        .setup          = xs_setup_local,
2912};
2913
2914static struct xprt_class        xs_udp_transport = {
2915        .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2916        .name           = "udp",
2917        .owner          = THIS_MODULE,
2918        .ident          = XPRT_TRANSPORT_UDP,
2919        .setup          = xs_setup_udp,
2920};
2921
2922static struct xprt_class        xs_tcp_transport = {
2923        .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2924        .name           = "tcp",
2925        .owner          = THIS_MODULE,
2926        .ident          = XPRT_TRANSPORT_TCP,
2927        .setup          = xs_setup_tcp,
2928};
2929
2930static struct xprt_class        xs_bc_tcp_transport = {
2931        .list           = LIST_HEAD_INIT(xs_bc_tcp_transport.list),
2932        .name           = "tcp NFSv4.1 backchannel",
2933        .owner          = THIS_MODULE,
2934        .ident          = XPRT_TRANSPORT_BC_TCP,
2935        .setup          = xs_setup_bc_tcp,
2936};
2937
2938/**
2939 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2940 *
2941 */
2942int init_socket_xprt(void)
2943{
2944#ifdef RPC_DEBUG
2945        if (!sunrpc_table_header)
2946                sunrpc_table_header = register_sysctl_table(sunrpc_table);
2947#endif
2948
2949        xprt_register_transport(&xs_local_transport);
2950        xprt_register_transport(&xs_udp_transport);
2951        xprt_register_transport(&xs_tcp_transport);
2952        xprt_register_transport(&xs_bc_tcp_transport);
2953
2954        return 0;
2955}
2956
2957/**
2958 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2959 *
2960 */
2961void cleanup_socket_xprt(void)
2962{
2963#ifdef RPC_DEBUG
2964        if (sunrpc_table_header) {
2965                unregister_sysctl_table(sunrpc_table_header);
2966                sunrpc_table_header = NULL;
2967        }
2968#endif
2969
2970        xprt_unregister_transport(&xs_local_transport);
2971        xprt_unregister_transport(&xs_udp_transport);
2972        xprt_unregister_transport(&xs_tcp_transport);
2973        xprt_unregister_transport(&xs_bc_tcp_transport);
2974}
2975
2976static int param_set_uint_minmax(const char *val,
2977                const struct kernel_param *kp,
2978                unsigned int min, unsigned int max)
2979{
2980        unsigned long num;
2981        int ret;
2982
2983        if (!val)
2984                return -EINVAL;
2985        ret = strict_strtoul(val, 0, &num);
2986        if (ret == -EINVAL || num < min || num > max)
2987                return -EINVAL;
2988        *((unsigned int *)kp->arg) = num;
2989        return 0;
2990}
2991
2992static int param_set_portnr(const char *val, const struct kernel_param *kp)
2993{
2994        return param_set_uint_minmax(val, kp,
2995                        RPC_MIN_RESVPORT,
2996                        RPC_MAX_RESVPORT);
2997}
2998
2999static struct kernel_param_ops param_ops_portnr = {
3000        .set = param_set_portnr,
3001        .get = param_get_uint,
3002};
3003
3004#define param_check_portnr(name, p) \
3005        __param_check(name, p, unsigned int);
3006
3007module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
3008module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
3009
3010static int param_set_slot_table_size(const char *val,
3011                                     const struct kernel_param *kp)
3012{
3013        return param_set_uint_minmax(val, kp,
3014                        RPC_MIN_SLOT_TABLE,
3015                        RPC_MAX_SLOT_TABLE);
3016}
3017
3018static struct kernel_param_ops param_ops_slot_table_size = {
3019        .set = param_set_slot_table_size,
3020        .get = param_get_uint,
3021};
3022
3023#define param_check_slot_table_size(name, p) \
3024        __param_check(name, p, unsigned int);
3025
3026static int param_set_max_slot_table_size(const char *val,
3027                                     const struct kernel_param *kp)
3028{
3029        return param_set_uint_minmax(val, kp,
3030                        RPC_MIN_SLOT_TABLE,
3031                        RPC_MAX_SLOT_TABLE_LIMIT);
3032}
3033
3034static struct kernel_param_ops param_ops_max_slot_table_size = {
3035        .set = param_set_max_slot_table_size,
3036        .get = param_get_uint,
3037};
3038
3039#define param_check_max_slot_table_size(name, p) \
3040        __param_check(name, p, unsigned int);
3041
3042module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
3043                   slot_table_size, 0644);
3044module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
3045                   max_slot_table_size, 0644);
3046module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
3047                   slot_table_size, 0644);
3048
3049
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.