linux/net/sunrpc/xprtsock.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/xprtsock.c
   3 *
   4 * Client-side transport implementation for sockets.
   5 *
   6 * TCP callback races fixes (C) 1998 Red Hat
   7 * TCP send fixes (C) 1998 Red Hat
   8 * TCP NFS related read + write fixes
   9 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10 *
  11 * Rewrite of larges part of the code in order to stabilize TCP stuff.
  12 * Fix behaviour when socket buffer is full.
  13 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
  14 *
  15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
  16 *
  17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
  18 *   <gilles.quillard@bull.net>
  19 */
  20
  21#include <linux/types.h>
  22#include <linux/string.h>
  23#include <linux/slab.h>
  24#include <linux/module.h>
  25#include <linux/capability.h>
  26#include <linux/pagemap.h>
  27#include <linux/errno.h>
  28#include <linux/socket.h>
  29#include <linux/in.h>
  30#include <linux/net.h>
  31#include <linux/mm.h>
  32#include <linux/un.h>
  33#include <linux/udp.h>
  34#include <linux/tcp.h>
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/sched.h>
  37#include <linux/sunrpc/svcsock.h>
  38#include <linux/sunrpc/xprtsock.h>
  39#include <linux/file.h>
  40#ifdef CONFIG_SUNRPC_BACKCHANNEL
  41#include <linux/sunrpc/bc_xprt.h>
  42#endif
  43
  44#include <net/sock.h>
  45#include <net/checksum.h>
  46#include <net/udp.h>
  47#include <net/tcp.h>
  48
  49#include "sunrpc.h"
  50
  51static void xs_close(struct rpc_xprt *xprt);
  52
  53/*
  54 * xprtsock tunables
  55 */
  56unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
  57unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
  58unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
  59
  60unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
  61unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
  62
  63#define XS_TCP_LINGER_TO        (15U * HZ)
  64static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
  65
  66/*
  67 * We can register our own files under /proc/sys/sunrpc by
  68 * calling register_sysctl_table() again.  The files in that
  69 * directory become the union of all files registered there.
  70 *
  71 * We simply need to make sure that we don't collide with
  72 * someone else's file names!
  73 */
  74
  75#ifdef RPC_DEBUG
  76
  77static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
  78static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
  79static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
  80static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
  81static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
  82
  83static struct ctl_table_header *sunrpc_table_header;
  84
  85/*
  86 * FIXME: changing the UDP slot table size should also resize the UDP
  87 *        socket buffers for existing UDP transports
  88 */
  89static ctl_table xs_tunables_table[] = {
  90        {
  91                .procname       = "udp_slot_table_entries",
  92                .data           = &xprt_udp_slot_table_entries,
  93                .maxlen         = sizeof(unsigned int),
  94                .mode           = 0644,
  95                .proc_handler   = proc_dointvec_minmax,
  96                .extra1         = &min_slot_table_size,
  97                .extra2         = &max_slot_table_size
  98        },
  99        {
 100                .procname       = "tcp_slot_table_entries",
 101                .data           = &xprt_tcp_slot_table_entries,
 102                .maxlen         = sizeof(unsigned int),
 103                .mode           = 0644,
 104                .proc_handler   = proc_dointvec_minmax,
 105                .extra1         = &min_slot_table_size,
 106                .extra2         = &max_slot_table_size
 107        },
 108        {
 109                .procname       = "tcp_max_slot_table_entries",
 110                .data           = &xprt_max_tcp_slot_table_entries,
 111                .maxlen         = sizeof(unsigned int),
 112                .mode           = 0644,
 113                .proc_handler   = proc_dointvec_minmax,
 114                .extra1         = &min_slot_table_size,
 115                .extra2         = &max_tcp_slot_table_limit
 116        },
 117        {
 118                .procname       = "min_resvport",
 119                .data           = &xprt_min_resvport,
 120                .maxlen         = sizeof(unsigned int),
 121                .mode           = 0644,
 122                .proc_handler   = proc_dointvec_minmax,
 123                .extra1         = &xprt_min_resvport_limit,
 124                .extra2         = &xprt_max_resvport_limit
 125        },
 126        {
 127                .procname       = "max_resvport",
 128                .data           = &xprt_max_resvport,
 129                .maxlen         = sizeof(unsigned int),
 130                .mode           = 0644,
 131                .proc_handler   = proc_dointvec_minmax,
 132                .extra1         = &xprt_min_resvport_limit,
 133                .extra2         = &xprt_max_resvport_limit
 134        },
 135        {
 136                .procname       = "tcp_fin_timeout",
 137                .data           = &xs_tcp_fin_timeout,
 138                .maxlen         = sizeof(xs_tcp_fin_timeout),
 139                .mode           = 0644,
 140                .proc_handler   = proc_dointvec_jiffies,
 141        },
 142        { },
 143};
 144
 145static ctl_table sunrpc_table[] = {
 146        {
 147                .procname       = "sunrpc",
 148                .mode           = 0555,
 149                .child          = xs_tunables_table
 150        },
 151        { },
 152};
 153
 154#endif
 155
 156/*
 157 * Wait duration for a reply from the RPC portmapper.
 158 */
 159#define XS_BIND_TO              (60U * HZ)
 160
 161/*
 162 * Delay if a UDP socket connect error occurs.  This is most likely some
 163 * kind of resource problem on the local host.
 164 */
 165#define XS_UDP_REEST_TO         (2U * HZ)
 166
 167/*
 168 * The reestablish timeout allows clients to delay for a bit before attempting
 169 * to reconnect to a server that just dropped our connection.
 170 *
 171 * We implement an exponential backoff when trying to reestablish a TCP
 172 * transport connection with the server.  Some servers like to drop a TCP
 173 * connection when they are overworked, so we start with a short timeout and
 174 * increase over time if the server is down or not responding.
 175 */
 176#define XS_TCP_INIT_REEST_TO    (3U * HZ)
 177#define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
 178
 179/*
 180 * TCP idle timeout; client drops the transport socket if it is idle
 181 * for this long.  Note that we also timeout UDP sockets to prevent
 182 * holding port numbers when there is no RPC traffic.
 183 */
 184#define XS_IDLE_DISC_TO         (5U * 60 * HZ)
 185
 186#ifdef RPC_DEBUG
 187# undef  RPC_DEBUG_DATA
 188# define RPCDBG_FACILITY        RPCDBG_TRANS
 189#endif
 190
 191#ifdef RPC_DEBUG_DATA
 192static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 193{
 194        u8 *buf = (u8 *) packet;
 195        int j;
 196
 197        dprintk("RPC:       %s\n", msg);
 198        for (j = 0; j < count && j < 128; j += 4) {
 199                if (!(j & 31)) {
 200                        if (j)
 201                                dprintk("\n");
 202                        dprintk("0x%04x ", j);
 203                }
 204                dprintk("%02x%02x%02x%02x ",
 205                        buf[j], buf[j+1], buf[j+2], buf[j+3]);
 206        }
 207        dprintk("\n");
 208}
 209#else
 210static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 211{
 212        /* NOP */
 213}
 214#endif
 215
 216struct sock_xprt {
 217        struct rpc_xprt         xprt;
 218
 219        /*
 220         * Network layer
 221         */
 222        struct socket *         sock;
 223        struct sock *           inet;
 224
 225        /*
 226         * State of TCP reply receive
 227         */
 228        __be32                  tcp_fraghdr,
 229                                tcp_xid,
 230                                tcp_calldir;
 231
 232        u32                     tcp_offset,
 233                                tcp_reclen;
 234
 235        unsigned long           tcp_copied,
 236                                tcp_flags;
 237
 238        /*
 239         * Connection of transports
 240         */
 241        struct delayed_work     connect_worker;
 242        struct sockaddr_storage srcaddr;
 243        unsigned short          srcport;
 244
 245        /*
 246         * UDP socket buffer size parameters
 247         */
 248        size_t                  rcvsize,
 249                                sndsize;
 250
 251        /*
 252         * Saved socket callback addresses
 253         */
 254        void                    (*old_data_ready)(struct sock *, int);
 255        void                    (*old_state_change)(struct sock *);
 256        void                    (*old_write_space)(struct sock *);
 257        void                    (*old_error_report)(struct sock *);
 258};
 259
 260/*
 261 * TCP receive state flags
 262 */
 263#define TCP_RCV_LAST_FRAG       (1UL << 0)
 264#define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
 265#define TCP_RCV_COPY_XID        (1UL << 2)
 266#define TCP_RCV_COPY_DATA       (1UL << 3)
 267#define TCP_RCV_READ_CALLDIR    (1UL << 4)
 268#define TCP_RCV_COPY_CALLDIR    (1UL << 5)
 269
 270/*
 271 * TCP RPC flags
 272 */
 273#define TCP_RPC_REPLY           (1UL << 6)
 274
 275static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
 276{
 277        return (struct sockaddr *) &xprt->addr;
 278}
 279
 280static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
 281{
 282        return (struct sockaddr_un *) &xprt->addr;
 283}
 284
 285static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
 286{
 287        return (struct sockaddr_in *) &xprt->addr;
 288}
 289
 290static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
 291{
 292        return (struct sockaddr_in6 *) &xprt->addr;
 293}
 294
 295static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
 296{
 297        struct sockaddr *sap = xs_addr(xprt);
 298        struct sockaddr_in6 *sin6;
 299        struct sockaddr_in *sin;
 300        struct sockaddr_un *sun;
 301        char buf[128];
 302
 303        switch (sap->sa_family) {
 304        case AF_LOCAL:
 305                sun = xs_addr_un(xprt);
 306                strlcpy(buf, sun->sun_path, sizeof(buf));
 307                xprt->address_strings[RPC_DISPLAY_ADDR] =
 308                                                kstrdup(buf, GFP_KERNEL);
 309                break;
 310        case AF_INET:
 311                (void)rpc_ntop(sap, buf, sizeof(buf));
 312                xprt->address_strings[RPC_DISPLAY_ADDR] =
 313                                                kstrdup(buf, GFP_KERNEL);
 314                sin = xs_addr_in(xprt);
 315                snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
 316                break;
 317        case AF_INET6:
 318                (void)rpc_ntop(sap, buf, sizeof(buf));
 319                xprt->address_strings[RPC_DISPLAY_ADDR] =
 320                                                kstrdup(buf, GFP_KERNEL);
 321                sin6 = xs_addr_in6(xprt);
 322                snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 323                break;
 324        default:
 325                BUG();
 326        }
 327
 328        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 329}
 330
 331static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
 332{
 333        struct sockaddr *sap = xs_addr(xprt);
 334        char buf[128];
 335
 336        snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
 337        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 338
 339        snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
 340        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 341}
 342
 343static void xs_format_peer_addresses(struct rpc_xprt *xprt,
 344                                     const char *protocol,
 345                                     const char *netid)
 346{
 347        xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
 348        xprt->address_strings[RPC_DISPLAY_NETID] = netid;
 349        xs_format_common_peer_addresses(xprt);
 350        xs_format_common_peer_ports(xprt);
 351}
 352
 353static void xs_update_peer_port(struct rpc_xprt *xprt)
 354{
 355        kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
 356        kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
 357
 358        xs_format_common_peer_ports(xprt);
 359}
 360
 361static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 362{
 363        unsigned int i;
 364
 365        for (i = 0; i < RPC_DISPLAY_MAX; i++)
 366                switch (i) {
 367                case RPC_DISPLAY_PROTO:
 368                case RPC_DISPLAY_NETID:
 369                        continue;
 370                default:
 371                        kfree(xprt->address_strings[i]);
 372                }
 373}
 374
 375#define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
 376
 377static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
 378{
 379        struct msghdr msg = {
 380                .msg_name       = addr,
 381                .msg_namelen    = addrlen,
 382                .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
 383        };
 384        struct kvec iov = {
 385                .iov_base       = vec->iov_base + base,
 386                .iov_len        = vec->iov_len - base,
 387        };
 388
 389        if (iov.iov_len != 0)
 390                return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
 391        return kernel_sendmsg(sock, &msg, NULL, 0, 0);
 392}
 393
 394static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
 395{
 396        struct page **ppage;
 397        unsigned int remainder;
 398        int err, sent = 0;
 399
 400        remainder = xdr->page_len - base;
 401        base += xdr->page_base;
 402        ppage = xdr->pages + (base >> PAGE_SHIFT);
 403        base &= ~PAGE_MASK;
 404        for(;;) {
 405                unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
 406                int flags = XS_SENDMSG_FLAGS;
 407
 408                remainder -= len;
 409                if (remainder != 0 || more)
 410                        flags |= MSG_MORE;
 411                err = sock->ops->sendpage(sock, *ppage, base, len, flags);
 412                if (remainder == 0 || err != len)
 413                        break;
 414                sent += err;
 415                ppage++;
 416                base = 0;
 417        }
 418        if (sent == 0)
 419                return err;
 420        if (err > 0)
 421                sent += err;
 422        return sent;
 423}
 424
 425/**
 426 * xs_sendpages - write pages directly to a socket
 427 * @sock: socket to send on
 428 * @addr: UDP only -- address of destination
 429 * @addrlen: UDP only -- length of destination address
 430 * @xdr: buffer containing this request
 431 * @base: starting position in the buffer
 432 *
 433 */
 434static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
 435{
 436        unsigned int remainder = xdr->len - base;
 437        int err, sent = 0;
 438
 439        if (unlikely(!sock))
 440                return -ENOTSOCK;
 441
 442        clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
 443        if (base != 0) {
 444                addr = NULL;
 445                addrlen = 0;
 446        }
 447
 448        if (base < xdr->head[0].iov_len || addr != NULL) {
 449                unsigned int len = xdr->head[0].iov_len - base;
 450                remainder -= len;
 451                err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
 452                if (remainder == 0 || err != len)
 453                        goto out;
 454                sent += err;
 455                base = 0;
 456        } else
 457                base -= xdr->head[0].iov_len;
 458
 459        if (base < xdr->page_len) {
 460                unsigned int len = xdr->page_len - base;
 461                remainder -= len;
 462                err = xs_send_pagedata(sock, xdr, base, remainder != 0);
 463                if (remainder == 0 || err != len)
 464                        goto out;
 465                sent += err;
 466                base = 0;
 467        } else
 468                base -= xdr->page_len;
 469
 470        if (base >= xdr->tail[0].iov_len)
 471                return sent;
 472        err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
 473out:
 474        if (sent == 0)
 475                return err;
 476        if (err > 0)
 477                sent += err;
 478        return sent;
 479}
 480
 481static void xs_nospace_callback(struct rpc_task *task)
 482{
 483        struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
 484
 485        transport->inet->sk_write_pending--;
 486        clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 487}
 488
 489/**
 490 * xs_nospace - place task on wait queue if transmit was incomplete
 491 * @task: task to put to sleep
 492 *
 493 */
 494static int xs_nospace(struct rpc_task *task)
 495{
 496        struct rpc_rqst *req = task->tk_rqstp;
 497        struct rpc_xprt *xprt = req->rq_xprt;
 498        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 499        int ret = -EAGAIN;
 500
 501        dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
 502                        task->tk_pid, req->rq_slen - req->rq_bytes_sent,
 503                        req->rq_slen);
 504
 505        /* Protect against races with write_space */
 506        spin_lock_bh(&xprt->transport_lock);
 507
 508        /* Don't race with disconnect */
 509        if (xprt_connected(xprt)) {
 510                if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
 511                        /*
 512                         * Notify TCP that we're limited by the application
 513                         * window size
 514                         */
 515                        set_bit(SOCK_NOSPACE, &transport->sock->flags);
 516                        transport->inet->sk_write_pending++;
 517                        /* ...and wait for more buffer space */
 518                        xprt_wait_for_buffer_space(task, xs_nospace_callback);
 519                }
 520        } else {
 521                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 522                ret = -ENOTCONN;
 523        }
 524
 525        spin_unlock_bh(&xprt->transport_lock);
 526        return ret;
 527}
 528
 529/*
 530 * Construct a stream transport record marker in @buf.
 531 */
 532static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
 533{
 534        u32 reclen = buf->len - sizeof(rpc_fraghdr);
 535        rpc_fraghdr *base = buf->head[0].iov_base;
 536        *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
 537}
 538
 539/**
 540 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
 541 * @task: RPC task that manages the state of an RPC request
 542 *
 543 * Return values:
 544 *        0:    The request has been sent
 545 *   EAGAIN:    The socket was blocked, please call again later to
 546 *              complete the request
 547 * ENOTCONN:    Caller needs to invoke connect logic then call again
 548 *    other:    Some other error occured, the request was not sent
 549 */
 550static int xs_local_send_request(struct rpc_task *task)
 551{
 552        struct rpc_rqst *req = task->tk_rqstp;
 553        struct rpc_xprt *xprt = req->rq_xprt;
 554        struct sock_xprt *transport =
 555                                container_of(xprt, struct sock_xprt, xprt);
 556        struct xdr_buf *xdr = &req->rq_snd_buf;
 557        int status;
 558
 559        xs_encode_stream_record_marker(&req->rq_snd_buf);
 560
 561        xs_pktdump("packet data:",
 562                        req->rq_svec->iov_base, req->rq_svec->iov_len);
 563
 564        status = xs_sendpages(transport->sock, NULL, 0,
 565                                                xdr, req->rq_bytes_sent);
 566        dprintk("RPC:       %s(%u) = %d\n",
 567                        __func__, xdr->len - req->rq_bytes_sent, status);
 568        if (likely(status >= 0)) {
 569                req->rq_bytes_sent += status;
 570                req->rq_xmit_bytes_sent += status;
 571                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 572                        req->rq_bytes_sent = 0;
 573                        return 0;
 574                }
 575                status = -EAGAIN;
 576        }
 577
 578        switch (status) {
 579        case -EAGAIN:
 580                status = xs_nospace(task);
 581                break;
 582        default:
 583                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 584                        -status);
 585        case -EPIPE:
 586                xs_close(xprt);
 587                status = -ENOTCONN;
 588        }
 589
 590        return status;
 591}
 592
 593/**
 594 * xs_udp_send_request - write an RPC request to a UDP socket
 595 * @task: address of RPC task that manages the state of an RPC request
 596 *
 597 * Return values:
 598 *        0:    The request has been sent
 599 *   EAGAIN:    The socket was blocked, please call again later to
 600 *              complete the request
 601 * ENOTCONN:    Caller needs to invoke connect logic then call again
 602 *    other:    Some other error occurred, the request was not sent
 603 */
 604static int xs_udp_send_request(struct rpc_task *task)
 605{
 606        struct rpc_rqst *req = task->tk_rqstp;
 607        struct rpc_xprt *xprt = req->rq_xprt;
 608        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 609        struct xdr_buf *xdr = &req->rq_snd_buf;
 610        int status;
 611
 612        xs_pktdump("packet data:",
 613                                req->rq_svec->iov_base,
 614                                req->rq_svec->iov_len);
 615
 616        if (!xprt_bound(xprt))
 617                return -ENOTCONN;
 618        status = xs_sendpages(transport->sock,
 619                              xs_addr(xprt),
 620                              xprt->addrlen, xdr,
 621                              req->rq_bytes_sent);
 622
 623        dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
 624                        xdr->len - req->rq_bytes_sent, status);
 625
 626        if (status >= 0) {
 627                req->rq_xmit_bytes_sent += status;
 628                if (status >= req->rq_slen)
 629                        return 0;
 630                /* Still some bytes left; set up for a retry later. */
 631                status = -EAGAIN;
 632        }
 633
 634        switch (status) {
 635        case -ENOTSOCK:
 636                status = -ENOTCONN;
 637                /* Should we call xs_close() here? */
 638                break;
 639        case -EAGAIN:
 640                status = xs_nospace(task);
 641                break;
 642        default:
 643                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 644                        -status);
 645        case -ENETUNREACH:
 646        case -EPIPE:
 647        case -ECONNREFUSED:
 648                /* When the server has died, an ICMP port unreachable message
 649                 * prompts ECONNREFUSED. */
 650                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 651        }
 652
 653        return status;
 654}
 655
 656/**
 657 * xs_tcp_shutdown - gracefully shut down a TCP socket
 658 * @xprt: transport
 659 *
 660 * Initiates a graceful shutdown of the TCP socket by calling the
 661 * equivalent of shutdown(SHUT_WR);
 662 */
 663static void xs_tcp_shutdown(struct rpc_xprt *xprt)
 664{
 665        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 666        struct socket *sock = transport->sock;
 667
 668        if (sock != NULL)
 669                kernel_sock_shutdown(sock, SHUT_WR);
 670}
 671
 672/**
 673 * xs_tcp_send_request - write an RPC request to a TCP socket
 674 * @task: address of RPC task that manages the state of an RPC request
 675 *
 676 * Return values:
 677 *        0:    The request has been sent
 678 *   EAGAIN:    The socket was blocked, please call again later to
 679 *              complete the request
 680 * ENOTCONN:    Caller needs to invoke connect logic then call again
 681 *    other:    Some other error occurred, the request was not sent
 682 *
 683 * XXX: In the case of soft timeouts, should we eventually give up
 684 *      if sendmsg is not able to make progress?
 685 */
 686static int xs_tcp_send_request(struct rpc_task *task)
 687{
 688        struct rpc_rqst *req = task->tk_rqstp;
 689        struct rpc_xprt *xprt = req->rq_xprt;
 690        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 691        struct xdr_buf *xdr = &req->rq_snd_buf;
 692        int status;
 693
 694        xs_encode_stream_record_marker(&req->rq_snd_buf);
 695
 696        xs_pktdump("packet data:",
 697                                req->rq_svec->iov_base,
 698                                req->rq_svec->iov_len);
 699
 700        /* Continue transmitting the packet/record. We must be careful
 701         * to cope with writespace callbacks arriving _after_ we have
 702         * called sendmsg(). */
 703        while (1) {
 704                status = xs_sendpages(transport->sock,
 705                                        NULL, 0, xdr, req->rq_bytes_sent);
 706
 707                dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
 708                                xdr->len - req->rq_bytes_sent, status);
 709
 710                if (unlikely(status < 0))
 711                        break;
 712
 713                /* If we've sent the entire packet, immediately
 714                 * reset the count of bytes sent. */
 715                req->rq_bytes_sent += status;
 716                req->rq_xmit_bytes_sent += status;
 717                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 718                        req->rq_bytes_sent = 0;
 719                        return 0;
 720                }
 721
 722                if (status != 0)
 723                        continue;
 724                status = -EAGAIN;
 725                break;
 726        }
 727
 728        switch (status) {
 729        case -ENOTSOCK:
 730                status = -ENOTCONN;
 731                /* Should we call xs_close() here? */
 732                break;
 733        case -EAGAIN:
 734                status = xs_nospace(task);
 735                break;
 736        default:
 737                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 738                        -status);
 739        case -ECONNRESET:
 740        case -EPIPE:
 741                xs_tcp_shutdown(xprt);
 742        case -ECONNREFUSED:
 743        case -ENOTCONN:
 744                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 745        }
 746
 747        return status;
 748}
 749
 750/**
 751 * xs_tcp_release_xprt - clean up after a tcp transmission
 752 * @xprt: transport
 753 * @task: rpc task
 754 *
 755 * This cleans up if an error causes us to abort the transmission of a request.
 756 * In this case, the socket may need to be reset in order to avoid confusing
 757 * the server.
 758 */
 759static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 760{
 761        struct rpc_rqst *req;
 762
 763        if (task != xprt->snd_task)
 764                return;
 765        if (task == NULL)
 766                goto out_release;
 767        req = task->tk_rqstp;
 768        if (req == NULL)
 769                goto out_release;
 770        if (req->rq_bytes_sent == 0)
 771                goto out_release;
 772        if (req->rq_bytes_sent == req->rq_snd_buf.len)
 773                goto out_release;
 774        set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
 775out_release:
 776        xprt_release_xprt(xprt, task);
 777}
 778
 779static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 780{
 781        transport->old_data_ready = sk->sk_data_ready;
 782        transport->old_state_change = sk->sk_state_change;
 783        transport->old_write_space = sk->sk_write_space;
 784        transport->old_error_report = sk->sk_error_report;
 785}
 786
 787static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 788{
 789        sk->sk_data_ready = transport->old_data_ready;
 790        sk->sk_state_change = transport->old_state_change;
 791        sk->sk_write_space = transport->old_write_space;
 792        sk->sk_error_report = transport->old_error_report;
 793}
 794
 795static void xs_reset_transport(struct sock_xprt *transport)
 796{
 797        struct socket *sock = transport->sock;
 798        struct sock *sk = transport->inet;
 799
 800        if (sk == NULL)
 801                return;
 802
 803        transport->srcport = 0;
 804
 805        write_lock_bh(&sk->sk_callback_lock);
 806        transport->inet = NULL;
 807        transport->sock = NULL;
 808
 809        sk->sk_user_data = NULL;
 810
 811        xs_restore_old_callbacks(transport, sk);
 812        write_unlock_bh(&sk->sk_callback_lock);
 813
 814        sk->sk_no_check = 0;
 815
 816        sock_release(sock);
 817}
 818
 819/**
 820 * xs_close - close a socket
 821 * @xprt: transport
 822 *
 823 * This is used when all requests are complete; ie, no DRC state remains
 824 * on the server we want to save.
 825 *
 826 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
 827 * xs_reset_transport() zeroing the socket from underneath a writer.
 828 */
 829static void xs_close(struct rpc_xprt *xprt)
 830{
 831        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 832
 833        dprintk("RPC:       xs_close xprt %p\n", xprt);
 834
 835        xs_reset_transport(transport);
 836        xprt->reestablish_timeout = 0;
 837
 838        smp_mb__before_clear_bit();
 839        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
 840        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 841        clear_bit(XPRT_CLOSING, &xprt->state);
 842        smp_mb__after_clear_bit();
 843        xprt_disconnect_done(xprt);
 844}
 845
 846static void xs_tcp_close(struct rpc_xprt *xprt)
 847{
 848        if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
 849                xs_close(xprt);
 850        else
 851                xs_tcp_shutdown(xprt);
 852}
 853
 854/**
 855 * xs_destroy - prepare to shutdown a transport
 856 * @xprt: doomed transport
 857 *
 858 */
 859static void xs_destroy(struct rpc_xprt *xprt)
 860{
 861        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 862
 863        dprintk("RPC:       xs_destroy xprt %p\n", xprt);
 864
 865        cancel_delayed_work_sync(&transport->connect_worker);
 866
 867        xs_close(xprt);
 868        xs_free_peer_addresses(xprt);
 869        xprt_free(xprt);
 870        module_put(THIS_MODULE);
 871}
 872
 873static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
 874{
 875        return (struct rpc_xprt *) sk->sk_user_data;
 876}
 877
 878static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
 879{
 880        struct xdr_skb_reader desc = {
 881                .skb            = skb,
 882                .offset         = sizeof(rpc_fraghdr),
 883                .count          = skb->len - sizeof(rpc_fraghdr),
 884        };
 885
 886        if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
 887                return -1;
 888        if (desc.count)
 889                return -1;
 890        return 0;
 891}
 892
 893/**
 894 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
 895 * @sk: socket with data to read
 896 * @len: how much data to read
 897 *
 898 * Currently this assumes we can read the whole reply in a single gulp.
 899 */
 900static void xs_local_data_ready(struct sock *sk, int len)
 901{
 902        struct rpc_task *task;
 903        struct rpc_xprt *xprt;
 904        struct rpc_rqst *rovr;
 905        struct sk_buff *skb;
 906        int err, repsize, copied;
 907        u32 _xid;
 908        __be32 *xp;
 909
 910        read_lock_bh(&sk->sk_callback_lock);
 911        dprintk("RPC:       %s...\n", __func__);
 912        xprt = xprt_from_sock(sk);
 913        if (xprt == NULL)
 914                goto out;
 915
 916        skb = skb_recv_datagram(sk, 0, 1, &err);
 917        if (skb == NULL)
 918                goto out;
 919
 920        if (xprt->shutdown)
 921                goto dropit;
 922
 923        repsize = skb->len - sizeof(rpc_fraghdr);
 924        if (repsize < 4) {
 925                dprintk("RPC:       impossible RPC reply size %d\n", repsize);
 926                goto dropit;
 927        }
 928
 929        /* Copy the XID from the skb... */
 930        xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
 931        if (xp == NULL)
 932                goto dropit;
 933
 934        /* Look up and lock the request corresponding to the given XID */
 935        spin_lock(&xprt->transport_lock);
 936        rovr = xprt_lookup_rqst(xprt, *xp);
 937        if (!rovr)
 938                goto out_unlock;
 939        task = rovr->rq_task;
 940
 941        copied = rovr->rq_private_buf.buflen;
 942        if (copied > repsize)
 943                copied = repsize;
 944
 945        if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 946                dprintk("RPC:       sk_buff copy failed\n");
 947                goto out_unlock;
 948        }
 949
 950        xprt_complete_rqst(task, copied);
 951
 952 out_unlock:
 953        spin_unlock(&xprt->transport_lock);
 954 dropit:
 955        skb_free_datagram(sk, skb);
 956 out:
 957        read_unlock_bh(&sk->sk_callback_lock);
 958}
 959
 960/**
 961 * xs_udp_data_ready - "data ready" callback for UDP sockets
 962 * @sk: socket with data to read
 963 * @len: how much data to read
 964 *
 965 */
 966static void xs_udp_data_ready(struct sock *sk, int len)
 967{
 968        struct rpc_task *task;
 969        struct rpc_xprt *xprt;
 970        struct rpc_rqst *rovr;
 971        struct sk_buff *skb;
 972        int err, repsize, copied;
 973        u32 _xid;
 974        __be32 *xp;
 975
 976        read_lock_bh(&sk->sk_callback_lock);
 977        dprintk("RPC:       xs_udp_data_ready...\n");
 978        if (!(xprt = xprt_from_sock(sk)))
 979                goto out;
 980
 981        if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
 982                goto out;
 983
 984        if (xprt->shutdown)
 985                goto dropit;
 986
 987        repsize = skb->len - sizeof(struct udphdr);
 988        if (repsize < 4) {
 989                dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
 990                goto dropit;
 991        }
 992
 993        /* Copy the XID from the skb... */
 994        xp = skb_header_pointer(skb, sizeof(struct udphdr),
 995                                sizeof(_xid), &_xid);
 996        if (xp == NULL)
 997                goto dropit;
 998
 999        /* Look up and lock the request corresponding to the given XID */
1000        spin_lock(&xprt->transport_lock);
1001        rovr = xprt_lookup_rqst(xprt, *xp);
1002        if (!rovr)
1003                goto out_unlock;
1004        task = rovr->rq_task;
1005
1006        if ((copied = rovr->rq_private_buf.buflen) > repsize)
1007                copied = repsize;
1008
1009        /* Suck it into the iovec, verify checksum if not done by hw. */
1010        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1011                UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
1012                goto out_unlock;
1013        }
1014
1015        UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
1016
1017        /* Something worked... */
1018        dst_confirm(skb_dst(skb));
1019
1020        xprt_adjust_cwnd(task, copied);
1021        xprt_complete_rqst(task, copied);
1022
1023 out_unlock:
1024        spin_unlock(&xprt->transport_lock);
1025 dropit:
1026        skb_free_datagram(sk, skb);
1027 out:
1028        read_unlock_bh(&sk->sk_callback_lock);
1029}
1030
1031static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1032{
1033        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1034        size_t len, used;
1035        char *p;
1036
1037        p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
1038        len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1039        used = xdr_skb_read_bits(desc, p, len);
1040        transport->tcp_offset += used;
1041        if (used != len)
1042                return;
1043
1044        transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
1045        if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1046                transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1047        else
1048                transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1049        transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1050
1051        transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1052        transport->tcp_offset = 0;
1053
1054        /* Sanity check of the record length */
1055        if (unlikely(transport->tcp_reclen < 8)) {
1056                dprintk("RPC:       invalid TCP record fragment length\n");
1057                xprt_force_disconnect(xprt);
1058                return;
1059        }
1060        dprintk("RPC:       reading TCP record fragment of length %d\n",
1061                        transport->tcp_reclen);
1062}
1063
1064static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1065{
1066        if (transport->tcp_offset == transport->tcp_reclen) {
1067                transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1068                transport->tcp_offset = 0;
1069                if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
1070                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1071                        transport->tcp_flags |= TCP_RCV_COPY_XID;
1072                        transport->tcp_copied = 0;
1073                }
1074        }
1075}
1076
1077static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1078{
1079        size_t len, used;
1080        char *p;
1081
1082        len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1083        dprintk("RPC:       reading XID (%Zu bytes)\n", len);
1084        p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1085        used = xdr_skb_read_bits(desc, p, len);
1086        transport->tcp_offset += used;
1087        if (used != len)
1088                return;
1089        transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1090        transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1091        transport->tcp_copied = 4;
1092        dprintk("RPC:       reading %s XID %08x\n",
1093                        (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1094                                                              : "request with",
1095                        ntohl(transport->tcp_xid));
1096        xs_tcp_check_fraghdr(transport);
1097}
1098
1099static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1100                                       struct xdr_skb_reader *desc)
1101{
1102        size_t len, used;
1103        u32 offset;
1104        char *p;
1105
1106        /*
1107         * We want transport->tcp_offset to be 8 at the end of this routine
1108         * (4 bytes for the xid and 4 bytes for the call/reply flag).
1109         * When this function is called for the first time,
1110         * transport->tcp_offset is 4 (after having already read the xid).
1111         */
1112        offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1113        len = sizeof(transport->tcp_calldir) - offset;
1114        dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1115        p = ((char *) &transport->tcp_calldir) + offset;
1116        used = xdr_skb_read_bits(desc, p, len);
1117        transport->tcp_offset += used;
1118        if (used != len)
1119                return;
1120        transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1121        /*
1122         * We don't yet have the XDR buffer, so we will write the calldir
1123         * out after we get the buffer from the 'struct rpc_rqst'
1124         */
1125        switch (ntohl(transport->tcp_calldir)) {
1126        case RPC_REPLY:
1127                transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1128                transport->tcp_flags |= TCP_RCV_COPY_DATA;
1129                transport->tcp_flags |= TCP_RPC_REPLY;
1130                break;
1131        case RPC_CALL:
1132                transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1133                transport->tcp_flags |= TCP_RCV_COPY_DATA;
1134                transport->tcp_flags &= ~TCP_RPC_REPLY;
1135                break;
1136        default:
1137                dprintk("RPC:       invalid request message type\n");
1138                xprt_force_disconnect(&transport->xprt);
1139        }
1140        xs_tcp_check_fraghdr(transport);
1141}
1142
1143static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1144                                     struct xdr_skb_reader *desc,
1145                                     struct rpc_rqst *req)
1146{
1147        struct sock_xprt *transport =
1148                                container_of(xprt, struct sock_xprt, xprt);
1149        struct xdr_buf *rcvbuf;
1150        size_t len;
1151        ssize_t r;
1152
1153        rcvbuf = &req->rq_private_buf;
1154
1155        if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1156                /*
1157                 * Save the RPC direction in the XDR buffer
1158                 */
1159                memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1160                        &transport->tcp_calldir,
1161                        sizeof(transport->tcp_calldir));
1162                transport->tcp_copied += sizeof(transport->tcp_calldir);
1163                transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1164        }
1165
1166        len = desc->count;
1167        if (len > transport->tcp_reclen - transport->tcp_offset) {
1168                struct xdr_skb_reader my_desc;
1169
1170                len = transport->tcp_reclen - transport->tcp_offset;
1171                memcpy(&my_desc, desc, sizeof(my_desc));
1172                my_desc.count = len;
1173                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1174                                          &my_desc, xdr_skb_read_bits);
1175                desc->count -= r;
1176                desc->offset += r;
1177        } else
1178                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1179                                          desc, xdr_skb_read_bits);
1180
1181        if (r > 0) {
1182                transport->tcp_copied += r;
1183                transport->tcp_offset += r;
1184        }
1185        if (r != len) {
1186                /* Error when copying to the receive buffer,
1187                 * usually because we weren't able to allocate
1188                 * additional buffer pages. All we can do now
1189                 * is turn off TCP_RCV_COPY_DATA, so the request
1190                 * will not receive any additional updates,
1191                 * and time out.
1192                 * Any remaining data from this record will
1193                 * be discarded.
1194                 */
1195                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1196                dprintk("RPC:       XID %08x truncated request\n",
1197                                ntohl(transport->tcp_xid));
1198                dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1199                                "tcp_offset = %u, tcp_reclen = %u\n",
1200                                xprt, transport->tcp_copied,
1201                                transport->tcp_offset, transport->tcp_reclen);
1202                return;
1203        }
1204
1205        dprintk("RPC:       XID %08x read %Zd bytes\n",
1206                        ntohl(transport->tcp_xid), r);
1207        dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1208                        "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1209                        transport->tcp_offset, transport->tcp_reclen);
1210
1211        if (transport->tcp_copied == req->rq_private_buf.buflen)
1212                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1213        else if (transport->tcp_offset == transport->tcp_reclen) {
1214                if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1215                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1216        }
1217}
1218
1219/*
1220 * Finds the request corresponding to the RPC xid and invokes the common
1221 * tcp read code to read the data.
1222 */
1223static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1224                                    struct xdr_skb_reader *desc)
1225{
1226        struct sock_xprt *transport =
1227                                container_of(xprt, struct sock_xprt, xprt);
1228        struct rpc_rqst *req;
1229
1230        dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1231
1232        /* Find and lock the request corresponding to this xid */
1233        spin_lock(&xprt->transport_lock);
1234        req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1235        if (!req) {
1236                dprintk("RPC:       XID %08x request not found!\n",
1237                                ntohl(transport->tcp_xid));
1238                spin_unlock(&xprt->transport_lock);
1239                return -1;
1240        }
1241
1242        xs_tcp_read_common(xprt, desc, req);
1243
1244        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1245                xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1246
1247        spin_unlock(&xprt->transport_lock);
1248        return 0;
1249}
1250
1251#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1252/*
1253 * Obtains an rpc_rqst previously allocated and invokes the common
1254 * tcp read code to read the data.  The result is placed in the callback
1255 * queue.
1256 * If we're unable to obtain the rpc_rqst we schedule the closing of the
1257 * connection and return -1.
1258 */
1259static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1260                                       struct xdr_skb_reader *desc)
1261{
1262        struct sock_xprt *transport =
1263                                container_of(xprt, struct sock_xprt, xprt);
1264        struct rpc_rqst *req;
1265
1266        req = xprt_alloc_bc_request(xprt);
1267        if (req == NULL) {
1268                printk(KERN_WARNING "Callback slot table overflowed\n");
1269                xprt_force_disconnect(xprt);
1270                return -1;
1271        }
1272
1273        req->rq_xid = transport->tcp_xid;
1274        dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1275        xs_tcp_read_common(xprt, desc, req);
1276
1277        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1278                struct svc_serv *bc_serv = xprt->bc_serv;
1279
1280                /*
1281                 * Add callback request to callback list.  The callback
1282                 * service sleeps on the sv_cb_waitq waiting for new
1283                 * requests.  Wake it up after adding enqueing the
1284                 * request.
1285                 */
1286                dprintk("RPC:       add callback request to list\n");
1287                spin_lock(&bc_serv->sv_cb_lock);
1288                list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1289                spin_unlock(&bc_serv->sv_cb_lock);
1290                wake_up(&bc_serv->sv_cb_waitq);
1291        }
1292
1293        req->rq_private_buf.len = transport->tcp_copied;
1294
1295        return 0;
1296}
1297
1298static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1299                                        struct xdr_skb_reader *desc)
1300{
1301        struct sock_xprt *transport =
1302                                container_of(xprt, struct sock_xprt, xprt);
1303
1304        return (transport->tcp_flags & TCP_RPC_REPLY) ?
1305                xs_tcp_read_reply(xprt, desc) :
1306                xs_tcp_read_callback(xprt, desc);
1307}
1308#else
1309static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1310                                        struct xdr_skb_reader *desc)
1311{
1312        return xs_tcp_read_reply(xprt, desc);
1313}
1314#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1315
1316/*
1317 * Read data off the transport.  This can be either an RPC_CALL or an
1318 * RPC_REPLY.  Relay the processing to helper functions.
1319 */
1320static void xs_tcp_read_data(struct rpc_xprt *xprt,
1321                                    struct xdr_skb_reader *desc)
1322{
1323        struct sock_xprt *transport =
1324                                container_of(xprt, struct sock_xprt, xprt);
1325
1326        if (_xs_tcp_read_data(xprt, desc) == 0)
1327                xs_tcp_check_fraghdr(transport);
1328        else {
1329                /*
1330                 * The transport_lock protects the request handling.
1331                 * There's no need to hold it to update the tcp_flags.
1332                 */
1333                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1334        }
1335}
1336
1337static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1338{
1339        size_t len;
1340
1341        len = transport->tcp_reclen - transport->tcp_offset;
1342        if (len > desc->count)
1343                len = desc->count;
1344        desc->count -= len;
1345        desc->offset += len;
1346        transport->tcp_offset += len;
1347        dprintk("RPC:       discarded %Zu bytes\n", len);
1348        xs_tcp_check_fraghdr(transport);
1349}
1350
1351static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1352{
1353        struct rpc_xprt *xprt = rd_desc->arg.data;
1354        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1355        struct xdr_skb_reader desc = {
1356                .skb    = skb,
1357                .offset = offset,
1358                .count  = len,
1359        };
1360
1361        dprintk("RPC:       xs_tcp_data_recv started\n");
1362        do {
1363                /* Read in a new fragment marker if necessary */
1364                /* Can we ever really expect to get completely empty fragments? */
1365                if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1366                        xs_tcp_read_fraghdr(xprt, &desc);
1367                        continue;
1368                }
1369                /* Read in the xid if necessary */
1370                if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1371                        xs_tcp_read_xid(transport, &desc);
1372                        continue;
1373                }
1374                /* Read in the call/reply flag */
1375                if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1376                        xs_tcp_read_calldir(transport, &desc);
1377                        continue;
1378                }
1379                /* Read in the request data */
1380                if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1381                        xs_tcp_read_data(xprt, &desc);
1382                        continue;
1383                }
1384                /* Skip over any trailing bytes on short reads */
1385                xs_tcp_read_discard(transport, &desc);
1386        } while (desc.count);
1387        dprintk("RPC:       xs_tcp_data_recv done\n");
1388        return len - desc.count;
1389}
1390
1391/**
1392 * xs_tcp_data_ready - "data ready" callback for TCP sockets
1393 * @sk: socket with data to read
1394 * @bytes: how much data to read
1395 *
1396 */
1397static void xs_tcp_data_ready(struct sock *sk, int bytes)
1398{
1399        struct rpc_xprt *xprt;
1400        read_descriptor_t rd_desc;
1401        int read;
1402
1403        dprintk("RPC:       xs_tcp_data_ready...\n");
1404
1405        read_lock_bh(&sk->sk_callback_lock);
1406        if (!(xprt = xprt_from_sock(sk)))
1407                goto out;
1408        if (xprt->shutdown)
1409                goto out;
1410
1411        /* Any data means we had a useful conversation, so
1412         * the we don't need to delay the next reconnect
1413         */
1414        if (xprt->reestablish_timeout)
1415                xprt->reestablish_timeout = 0;
1416
1417        /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1418        rd_desc.arg.data = xprt;
1419        do {
1420                rd_desc.count = 65536;
1421                read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1422        } while (read > 0);
1423out:
1424        read_unlock_bh(&sk->sk_callback_lock);
1425}
1426
1427/*
1428 * Do the equivalent of linger/linger2 handling for dealing with
1429 * broken servers that don't close the socket in a timely
1430 * fashion
1431 */
1432static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1433                unsigned long timeout)
1434{
1435        struct sock_xprt *transport;
1436
1437        if (xprt_test_and_set_connecting(xprt))
1438                return;
1439        set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1440        transport = container_of(xprt, struct sock_xprt, xprt);
1441        queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1442                           timeout);
1443}
1444
1445static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1446{
1447        struct sock_xprt *transport;
1448
1449        transport = container_of(xprt, struct sock_xprt, xprt);
1450
1451        if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1452            !cancel_delayed_work(&transport->connect_worker))
1453                return;
1454        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1455        xprt_clear_connecting(xprt);
1456}
1457
1458static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1459{
1460        smp_mb__before_clear_bit();
1461        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1462        clear_bit(XPRT_CLOSING, &xprt->state);
1463        smp_mb__after_clear_bit();
1464        /* Mark transport as closed and wake up all pending tasks */
1465        xprt_disconnect_done(xprt);
1466}
1467
1468/**
1469 * xs_tcp_state_change - callback to handle TCP socket state changes
1470 * @sk: socket whose state has changed
1471 *
1472 */
1473static void xs_tcp_state_change(struct sock *sk)
1474{
1475        struct rpc_xprt *xprt;
1476
1477        read_lock_bh(&sk->sk_callback_lock);
1478        if (!(xprt = xprt_from_sock(sk)))
1479                goto out;
1480        dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1481        dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
1482                        sk->sk_state, xprt_connected(xprt),
1483                        sock_flag(sk, SOCK_DEAD),
1484                        sock_flag(sk, SOCK_ZAPPED),
1485                        sk->sk_shutdown);
1486
1487        switch (sk->sk_state) {
1488        case TCP_ESTABLISHED:
1489                spin_lock(&xprt->transport_lock);
1490                if (!xprt_test_and_set_connected(xprt)) {
1491                        struct sock_xprt *transport = container_of(xprt,
1492                                        struct sock_xprt, xprt);
1493
1494                        /* Reset TCP record info */
1495                        transport->tcp_offset = 0;
1496                        transport->tcp_reclen = 0;
1497                        transport->tcp_copied = 0;
1498                        transport->tcp_flags =
1499                                TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1500
1501                        xprt_wake_pending_tasks(xprt, -EAGAIN);
1502                }
1503                spin_unlock(&xprt->transport_lock);
1504                break;
1505        case TCP_FIN_WAIT1:
1506                /* The client initiated a shutdown of the socket */
1507                xprt->connect_cookie++;
1508                xprt->reestablish_timeout = 0;
1509                set_bit(XPRT_CLOSING, &xprt->state);
1510                smp_mb__before_clear_bit();
1511                clear_bit(XPRT_CONNECTED, &xprt->state);
1512                clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1513                smp_mb__after_clear_bit();
1514                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1515                break;
1516        case TCP_CLOSE_WAIT:
1517                /* The server initiated a shutdown of the socket */
1518                xprt_force_disconnect(xprt);
1519                xprt->connect_cookie++;
1520        case TCP_CLOSING:
1521                /*
1522                 * If the server closed down the connection, make sure that
1523                 * we back off before reconnecting
1524                 */
1525                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1526                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1527                break;
1528        case TCP_LAST_ACK:
1529                set_bit(XPRT_CLOSING, &xprt->state);
1530                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1531                smp_mb__before_clear_bit();
1532                clear_bit(XPRT_CONNECTED, &xprt->state);
1533                smp_mb__after_clear_bit();
1534                break;
1535        case TCP_CLOSE:
1536                xs_tcp_cancel_linger_timeout(xprt);
1537                xs_sock_mark_closed(xprt);
1538        }
1539 out:
1540        read_unlock_bh(&sk->sk_callback_lock);
1541}
1542
1543/**
1544 * xs_error_report - callback mainly for catching socket errors
1545 * @sk: socket
1546 */
1547static void xs_error_report(struct sock *sk)
1548{
1549        struct rpc_xprt *xprt;
1550
1551        read_lock_bh(&sk->sk_callback_lock);
1552        if (!(xprt = xprt_from_sock(sk)))
1553                goto out;
1554        dprintk("RPC:       %s client %p...\n"
1555                        "RPC:       error %d\n",
1556                        __func__, xprt, sk->sk_err);
1557        xprt_wake_pending_tasks(xprt, -EAGAIN);
1558out:
1559        read_unlock_bh(&sk->sk_callback_lock);
1560}
1561
1562static void xs_write_space(struct sock *sk)
1563{
1564        struct socket *sock;
1565        struct rpc_xprt *xprt;
1566
1567        if (unlikely(!(sock = sk->sk_socket)))
1568                return;
1569        clear_bit(SOCK_NOSPACE, &sock->flags);
1570
1571        if (unlikely(!(xprt = xprt_from_sock(sk))))
1572                return;
1573        if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1574                return;
1575
1576        xprt_write_space(xprt);
1577}
1578
1579/**
1580 * xs_udp_write_space - callback invoked when socket buffer space
1581 *                             becomes available
1582 * @sk: socket whose state has changed
1583 *
1584 * Called when more output buffer space is available for this socket.
1585 * We try not to wake our writers until they can make "significant"
1586 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1587 * with a bunch of small requests.
1588 */
1589static void xs_udp_write_space(struct sock *sk)
1590{
1591        read_lock_bh(&sk->sk_callback_lock);
1592
1593        /* from net/core/sock.c:sock_def_write_space */
1594        if (sock_writeable(sk))
1595                xs_write_space(sk);
1596
1597        read_unlock_bh(&sk->sk_callback_lock);
1598}
1599
1600/**
1601 * xs_tcp_write_space - callback invoked when socket buffer space
1602 *                             becomes available
1603 * @sk: socket whose state has changed
1604 *
1605 * Called when more output buffer space is available for this socket.
1606 * We try not to wake our writers until they can make "significant"
1607 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1608 * with a bunch of small requests.
1609 */
1610static void xs_tcp_write_space(struct sock *sk)
1611{
1612        read_lock_bh(&sk->sk_callback_lock);
1613
1614        /* from net/core/stream.c:sk_stream_write_space */
1615        if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1616                xs_write_space(sk);
1617
1618        read_unlock_bh(&sk->sk_callback_lock);
1619}
1620
1621static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1622{
1623        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1624        struct sock *sk = transport->inet;
1625
1626        if (transport->rcvsize) {
1627                sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1628                sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1629        }
1630        if (transport->sndsize) {
1631                sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1632                sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1633                sk->sk_write_space(sk);
1634        }
1635}
1636
1637/**
1638 * xs_udp_set_buffer_size - set send and receive limits
1639 * @xprt: generic transport
1640 * @sndsize: requested size of send buffer, in bytes
1641 * @rcvsize: requested size of receive buffer, in bytes
1642 *
1643 * Set socket send and receive buffer size limits.
1644 */
1645static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1646{
1647        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1648
1649        transport->sndsize = 0;
1650        if (sndsize)
1651                transport->sndsize = sndsize + 1024;
1652        transport->rcvsize = 0;
1653        if (rcvsize)
1654                transport->rcvsize = rcvsize + 1024;
1655
1656        xs_udp_do_set_buffer_size(xprt);
1657}
1658
1659/**
1660 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1661 * @task: task that timed out
1662 *
1663 * Adjust the congestion window after a retransmit timeout has occurred.
1664 */
1665static void xs_udp_timer(struct rpc_task *task)
1666{
1667        xprt_adjust_cwnd(task, -ETIMEDOUT);
1668}
1669
1670static unsigned short xs_get_random_port(void)
1671{
1672        unsigned short range = xprt_max_resvport - xprt_min_resvport;
1673        unsigned short rand = (unsigned short) net_random() % range;
1674        return rand + xprt_min_resvport;
1675}
1676
1677/**
1678 * xs_set_port - reset the port number in the remote endpoint address
1679 * @xprt: generic transport
1680 * @port: new port number
1681 *
1682 */
1683static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1684{
1685        dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1686
1687        rpc_set_port(xs_addr(xprt), port);
1688        xs_update_peer_port(xprt);
1689}
1690
1691static unsigned short xs_get_srcport(struct sock_xprt *transport)
1692{
1693        unsigned short port = transport->srcport;
1694
1695        if (port == 0 && transport->xprt.resvport)
1696                port = xs_get_random_port();
1697        return port;
1698}
1699
1700static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1701{
1702        if (transport->srcport != 0)
1703                transport->srcport = 0;
1704        if (!transport->xprt.resvport)
1705                return 0;
1706        if (port <= xprt_min_resvport || port > xprt_max_resvport)
1707                return xprt_max_resvport;
1708        return --port;
1709}
1710static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1711{
1712        struct sockaddr_storage myaddr;
1713        int err, nloop = 0;
1714        unsigned short port = xs_get_srcport(transport);
1715        unsigned short last;
1716
1717        memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1718        do {
1719                rpc_set_port((struct sockaddr *)&myaddr, port);
1720                err = kernel_bind(sock, (struct sockaddr *)&myaddr,
1721                                transport->xprt.addrlen);
1722                if (port == 0)
1723                        break;
1724                if (err == 0) {
1725                        transport->srcport = port;
1726                        break;
1727                }
1728                last = port;
1729                port = xs_next_srcport(transport, port);
1730                if (port > last)
1731                        nloop++;
1732        } while (err == -EADDRINUSE && nloop != 2);
1733
1734        if (myaddr.ss_family == AF_INET)
1735                dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
1736                                &((struct sockaddr_in *)&myaddr)->sin_addr,
1737                                port, err ? "failed" : "ok", err);
1738        else
1739                dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
1740                                &((struct sockaddr_in6 *)&myaddr)->sin6_addr,
1741                                port, err ? "failed" : "ok", err);
1742        return err;
1743}
1744
1745/*
1746 * We don't support autobind on AF_LOCAL sockets
1747 */
1748static void xs_local_rpcbind(struct rpc_task *task)
1749{
1750        xprt_set_bound(task->tk_xprt);
1751}
1752
1753static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
1754{
1755}
1756
1757#ifdef CONFIG_DEBUG_LOCK_ALLOC
1758static struct lock_class_key xs_key[2];
1759static struct lock_class_key xs_slock_key[2];
1760
1761static inline void xs_reclassify_socketu(struct socket *sock)
1762{
1763        struct sock *sk = sock->sk;
1764
1765        BUG_ON(sock_owned_by_user(sk));
1766        sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
1767                &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]);
1768}
1769
1770static inline void xs_reclassify_socket4(struct socket *sock)
1771{
1772        struct sock *sk = sock->sk;
1773
1774        BUG_ON(sock_owned_by_user(sk));
1775        sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1776                &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1777}
1778
1779static inline void xs_reclassify_socket6(struct socket *sock)
1780{
1781        struct sock *sk = sock->sk;
1782
1783        BUG_ON(sock_owned_by_user(sk));
1784        sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1785                &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1786}
1787
1788static inline void xs_reclassify_socket(int family, struct socket *sock)
1789{
1790        switch (family) {
1791        case AF_LOCAL:
1792                xs_reclassify_socketu(sock);
1793                break;
1794        case AF_INET:
1795                xs_reclassify_socket4(sock);
1796                break;
1797        case AF_INET6:
1798                xs_reclassify_socket6(sock);
1799                break;
1800        }
1801}
1802#else
1803static inline void xs_reclassify_socketu(struct socket *sock)
1804{
1805}
1806
1807static inline void xs_reclassify_socket4(struct socket *sock)
1808{
1809}
1810
1811static inline void xs_reclassify_socket6(struct socket *sock)
1812{
1813}
1814
1815static inline void xs_reclassify_socket(int family, struct socket *sock)
1816{
1817}
1818#endif
1819
1820static struct socket *xs_create_sock(struct rpc_xprt *xprt,
1821                struct sock_xprt *transport, int family, int type, int protocol)
1822{
1823        struct socket *sock;
1824        int err;
1825
1826        err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1);
1827        if (err < 0) {
1828                dprintk("RPC:       can't create %d transport socket (%d).\n",
1829                                protocol, -err);
1830                goto out;
1831        }
1832        xs_reclassify_socket(family, sock);
1833
1834        err = xs_bind(transport, sock);
1835        if (err) {
1836                sock_release(sock);
1837                goto out;
1838        }
1839
1840        return sock;
1841out:
1842        return ERR_PTR(err);
1843}
1844
1845static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1846                                      struct socket *sock)
1847{
1848        struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1849                                                                        xprt);
1850
1851        if (!transport->inet) {
1852                struct sock *sk = sock->sk;
1853
1854                write_lock_bh(&sk->sk_callback_lock);
1855
1856                xs_save_old_callbacks(transport, sk);
1857
1858                sk->sk_user_data = xprt;
1859                sk->sk_data_ready = xs_local_data_ready;
1860                sk->sk_write_space = xs_udp_write_space;
1861                sk->sk_error_report = xs_error_report;
1862                sk->sk_allocation = GFP_ATOMIC;
1863
1864                xprt_clear_connected(xprt);
1865
1866                /* Reset to new socket */
1867                transport->sock = sock;
1868                transport->inet = sk;
1869
1870                write_unlock_bh(&sk->sk_callback_lock);
1871        }
1872
1873        /* Tell the socket layer to start connecting... */
1874        xprt->stat.connect_count++;
1875        xprt->stat.connect_start = jiffies;
1876        return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1877}
1878
1879/**
1880 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
1881 * @xprt: RPC transport to connect
1882 * @transport: socket transport to connect
1883 * @create_sock: function to create a socket of the correct type
1884 *
1885 * Invoked by a work queue tasklet.
1886 */
1887static void xs_local_setup_socket(struct work_struct *work)
1888{
1889        struct sock_xprt *transport =
1890                container_of(work, struct sock_xprt, connect_worker.work);
1891        struct rpc_xprt *xprt = &transport->xprt;
1892        struct socket *sock;
1893        int status = -EIO;
1894
1895        if (xprt->shutdown)
1896                goto out;
1897
1898        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1899        status = __sock_create(xprt->xprt_net, AF_LOCAL,
1900                                        SOCK_STREAM, 0, &sock, 1);
1901        if (status < 0) {
1902                dprintk("RPC:       can't create AF_LOCAL "
1903                        "transport socket (%d).\n", -status);
1904                goto out;
1905        }
1906        xs_reclassify_socketu(sock);
1907
1908        dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
1909                        xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1910
1911        status = xs_local_finish_connecting(xprt, sock);
1912        switch (status) {
1913        case 0:
1914                dprintk("RPC:       xprt %p connected to %s\n",
1915                                xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1916                xprt_set_connected(xprt);
1917                break;
1918        case -ENOENT:
1919                dprintk("RPC:       xprt %p: socket %s does not exist\n",
1920                                xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1921                break;
1922        default:
1923                printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
1924                                __func__, -status,
1925                                xprt->address_strings[RPC_DISPLAY_ADDR]);
1926        }
1927
1928out:
1929        xprt_clear_connecting(xprt);
1930        xprt_wake_pending_tasks(xprt, status);
1931}
1932
1933static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1934{
1935        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1936
1937        if (!transport->inet) {
1938                struct sock *sk = sock->sk;
1939
1940                write_lock_bh(&sk->sk_callback_lock);
1941
1942                xs_save_old_callbacks(transport, sk);
1943
1944                sk->sk_user_data = xprt;
1945                sk->sk_data_ready = xs_udp_data_ready;
1946                sk->sk_write_space = xs_udp_write_space;
1947                sk->sk_error_report = xs_error_report;
1948                sk->sk_no_check = UDP_CSUM_NORCV;
1949                sk->sk_allocation = GFP_ATOMIC;
1950
1951                xprt_set_connected(xprt);
1952
1953                /* Reset to new socket */
1954                transport->sock = sock;
1955                transport->inet = sk;
1956
1957                write_unlock_bh(&sk->sk_callback_lock);
1958        }
1959        xs_udp_do_set_buffer_size(xprt);
1960}
1961
1962static void xs_udp_setup_socket(struct work_struct *work)
1963{
1964        struct sock_xprt *transport =
1965                container_of(work, struct sock_xprt, connect_worker.work);
1966        struct rpc_xprt *xprt = &transport->xprt;
1967        struct socket *sock = transport->sock;
1968        int status = -EIO;
1969
1970        if (xprt->shutdown)
1971                goto out;
1972
1973        /* Start by resetting any existing state */
1974        xs_reset_transport(transport);
1975        sock = xs_create_sock(xprt, transport,
1976                        xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
1977        if (IS_ERR(sock))
1978                goto out;
1979
1980        dprintk("RPC:       worker connecting xprt %p via %s to "
1981                                "%s (port %s)\n", xprt,
1982                        xprt->address_strings[RPC_DISPLAY_PROTO],
1983                        xprt->address_strings[RPC_DISPLAY_ADDR],
1984                        xprt->address_strings[RPC_DISPLAY_PORT]);
1985
1986        xs_udp_finish_connecting(xprt, sock);
1987        status = 0;
1988out:
1989        xprt_clear_connecting(xprt);
1990        xprt_wake_pending_tasks(xprt, status);
1991}
1992
1993/*
1994 * We need to preserve the port number so the reply cache on the server can
1995 * find our cached RPC replies when we get around to reconnecting.
1996 */
1997static void xs_abort_connection(struct sock_xprt *transport)
1998{
1999        int result;
2000        struct sockaddr any;
2001
2002        dprintk("RPC:       disconnecting xprt %p to reuse port\n", transport);
2003
2004        /*
2005         * Disconnect the transport socket by doing a connect operation
2006         * with AF_UNSPEC.  This should return immediately...
2007         */
2008        memset(&any, 0, sizeof(any));
2009        any.sa_family = AF_UNSPEC;
2010        result = kernel_connect(transport->sock, &any, sizeof(any), 0);
2011        if (!result)
2012                xs_sock_mark_closed(&transport->xprt);
2013        else
2014                dprintk("RPC:       AF_UNSPEC connect return code %d\n",
2015                                result);
2016}
2017
2018static void xs_tcp_reuse_connection(struct sock_xprt *transport)
2019{
2020        unsigned int state = transport->inet->sk_state;
2021
2022        if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) {
2023                /* we don't need to abort the connection if the socket
2024                 * hasn't undergone a shutdown
2025                 */
2026                if (transport->inet->sk_shutdown == 0)
2027                        return;
2028                dprintk("RPC:       %s: TCP_CLOSEd and sk_shutdown set to %d\n",
2029                                __func__, transport->inet->sk_shutdown);
2030        }
2031        if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) {
2032                /* we don't need to abort the connection if the socket
2033                 * hasn't undergone a shutdown
2034                 */
2035                if (transport->inet->sk_shutdown == 0)
2036                        return;
2037                dprintk("RPC:       %s: ESTABLISHED/SYN_SENT "
2038                                "sk_shutdown set to %d\n",
2039                                __func__, transport->inet->sk_shutdown);
2040        }
2041        xs_abort_connection(transport);
2042}
2043
2044static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2045{
2046        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2047        int ret = -ENOTCONN;
2048
2049        if (!transport->inet) {
2050                struct sock *sk = sock->sk;
2051
2052                write_lock_bh(&sk->sk_callback_lock);
2053
2054                xs_save_old_callbacks(transport, sk);
2055
2056                sk->sk_user_data = xprt;
2057                sk->sk_data_ready = xs_tcp_data_ready;
2058                sk->sk_state_change = xs_tcp_state_change;
2059                sk->sk_write_space = xs_tcp_write_space;
2060                sk->sk_error_report = xs_error_report;
2061                sk->sk_allocation = GFP_ATOMIC;
2062
2063                /* socket options */
2064                sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
2065                sock_reset_flag(sk, SOCK_LINGER);
2066                tcp_sk(sk)->linger2 = 0;
2067                tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
2068
2069                xprt_clear_connected(xprt);
2070
2071                /* Reset to new socket */
2072                transport->sock = sock;
2073                transport->inet = sk;
2074
2075                write_unlock_bh(&sk->sk_callback_lock);
2076        }
2077
2078        if (!xprt_bound(xprt))
2079                goto out;
2080
2081        /* Tell the socket layer to start connecting... */
2082        xprt->stat.connect_count++;
2083        xprt->stat.connect_start = jiffies;
2084        ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
2085        switch (ret) {
2086        case 0:
2087        case -EINPROGRESS:
2088                /* SYN_SENT! */
2089                xprt->connect_cookie++;
2090                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2091                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2092        }
2093out:
2094        return ret;
2095}
2096
2097/**
2098 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
2099 * @xprt: RPC transport to connect
2100 * @transport: socket transport to connect
2101 * @create_sock: function to create a socket of the correct type
2102 *
2103 * Invoked by a work queue tasklet.
2104 */
2105static void xs_tcp_setup_socket(struct work_struct *work)
2106{
2107        struct sock_xprt *transport =
2108                container_of(work, struct sock_xprt, connect_worker.work);
2109        struct socket *sock = transport->sock;
2110        struct rpc_xprt *xprt = &transport->xprt;
2111        int status = -EIO;
2112
2113        if (xprt->shutdown)
2114                goto out;
2115
2116        if (!sock) {
2117                clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
2118                sock = xs_create_sock(xprt, transport,
2119                                xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP);
2120                if (IS_ERR(sock)) {
2121                        status = PTR_ERR(sock);
2122                        goto out;
2123                }
2124        } else {
2125                int abort_and_exit;
2126
2127                abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
2128                                &xprt->state);
2129                /* "close" the socket, preserving the local port */
2130                xs_tcp_reuse_connection(transport);
2131
2132                if (abort_and_exit)
2133                        goto out_eagain;
2134        }
2135
2136        dprintk("RPC:       worker connecting xprt %p via %s to "
2137                                "%s (port %s)\n", xprt,
2138                        xprt->address_strings[RPC_DISPLAY_PROTO],
2139                        xprt->address_strings[RPC_DISPLAY_ADDR],
2140                        xprt->address_strings[RPC_DISPLAY_PORT]);
2141
2142        status = xs_tcp_finish_connecting(xprt, sock);
2143        dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
2144                        xprt, -status, xprt_connected(xprt),
2145                        sock->sk->sk_state);
2146        switch (status) {
2147        default:
2148                printk("%s: connect returned unhandled error %d\n",
2149                        __func__, status);
2150        case -EADDRNOTAVAIL:
2151                /* We're probably in TIME_WAIT. Get rid of existing socket,
2152                 * and retry
2153                 */
2154                set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
2155                xprt_force_disconnect(xprt);
2156                break;
2157        case -ECONNREFUSED:
2158        case -ECONNRESET:
2159        case -ENETUNREACH:
2160                /* retry with existing socket, after a delay */
2161        case 0:
2162        case -EINPROGRESS:
2163        case -EALREADY:
2164                xprt_clear_connecting(xprt);
2165                return;
2166        case -EINVAL:
2167                /* Happens, for instance, if the user specified a link
2168                 * local IPv6 address without a scope-id.
2169                 */
2170                goto out;
2171        }
2172out_eagain:
2173        status = -EAGAIN;
2174out:
2175        xprt_clear_connecting(xprt);
2176        xprt_wake_pending_tasks(xprt, status);
2177}
2178
2179/**
2180 * xs_connect - connect a socket to a remote endpoint
2181 * @task: address of RPC task that manages state of connect request
2182 *
2183 * TCP: If the remote end dropped the connection, delay reconnecting.
2184 *
2185 * UDP socket connects are synchronous, but we use a work queue anyway
2186 * to guarantee that even unprivileged user processes can set up a
2187 * socket on a privileged port.
2188 *
2189 * If a UDP socket connect fails, the delay behavior here prevents
2190 * retry floods (hard mounts).
2191 */
2192static void xs_connect(struct rpc_task *task)
2193{
2194        struct rpc_xprt *xprt = task->tk_xprt;
2195        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2196
2197        if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
2198                dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2199                                "seconds\n",
2200                                xprt, xprt->reestablish_timeout / HZ);
2201                queue_delayed_work(rpciod_workqueue,
2202                                   &transport->connect_worker,
2203                                   xprt->reestablish_timeout);
2204                xprt->reestablish_timeout <<= 1;
2205                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2206                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2207                if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2208                        xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2209        } else {
2210                dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2211                queue_delayed_work(rpciod_workqueue,
2212                                   &transport->connect_worker, 0);
2213        }
2214}
2215
2216/**
2217 * xs_local_print_stats - display AF_LOCAL socket-specifc stats
2218 * @xprt: rpc_xprt struct containing statistics
2219 * @seq: output file
2220 *
2221 */
2222static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2223{
2224        long idle_time = 0;
2225
2226        if (xprt_connected(xprt))
2227                idle_time = (long)(jiffies - xprt->last_used) / HZ;
2228
2229        seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
2230                        "%llu %llu\n",
2231                        xprt->stat.bind_count,
2232                        xprt->stat.connect_count,
2233                        xprt->stat.connect_time,
2234                        idle_time,
2235                        xprt->stat.sends,
2236                        xprt->stat.recvs,
2237                        xprt->stat.bad_xids,
2238                        xprt->stat.req_u,
2239                        xprt->stat.bklog_u);
2240}
2241
2242/**
2243 * xs_udp_print_stats - display UDP socket-specifc stats
2244 * @xprt: rpc_xprt struct containing statistics
2245 * @seq: output file
2246 *
2247 */
2248static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2249{
2250        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2251
2252        seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2253                        transport->srcport,
2254                        xprt->stat.bind_count,
2255                        xprt->stat.sends,
2256                        xprt->stat.recvs,
2257                        xprt->stat.bad_xids,
2258                        xprt->stat.req_u,
2259                        xprt->stat.bklog_u);
2260}
2261
2262/**
2263 * xs_tcp_print_stats - display TCP socket-specifc stats
2264 * @xprt: rpc_xprt struct containing statistics
2265 * @seq: output file
2266 *
2267 */
2268static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2269{
2270        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2271        long idle_time = 0;
2272
2273        if (xprt_connected(xprt))
2274                idle_time = (long)(jiffies - xprt->last_used) / HZ;
2275
2276        seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2277                        transport->srcport,
2278                        xprt->stat.bind_count,
2279                        xprt->stat.connect_count,
2280                        xprt->stat.connect_time,
2281                        idle_time,
2282                        xprt->stat.sends,
2283                        xprt->stat.recvs,
2284                        xprt->stat.bad_xids,
2285                        xprt->stat.req_u,
2286                        xprt->stat.bklog_u);
2287}
2288
2289/*
2290 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2291 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2292 * to use the server side send routines.
2293 */
2294static void *bc_malloc(struct rpc_task *task, size_t size)
2295{
2296        struct page *page;
2297        struct rpc_buffer *buf;
2298
2299        BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2300        page = alloc_page(GFP_KERNEL);
2301
2302        if (!page)
2303                return NULL;
2304
2305        buf = page_address(page);
2306        buf->len = PAGE_SIZE;
2307
2308        return buf->data;
2309}
2310
2311/*
2312 * Free the space allocated in the bc_alloc routine
2313 */
2314static void bc_free(void *buffer)
2315{
2316        struct rpc_buffer *buf;
2317
2318        if (!buffer)
2319                return;
2320
2321        buf = container_of(buffer, struct rpc_buffer, data);
2322        free_page((unsigned long)buf);
2323}
2324
2325/*
2326 * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2327 * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2328 */
2329static int bc_sendto(struct rpc_rqst *req)
2330{
2331        int len;
2332        struct xdr_buf *xbufp = &req->rq_snd_buf;
2333        struct rpc_xprt *xprt = req->rq_xprt;
2334        struct sock_xprt *transport =
2335                                container_of(xprt, struct sock_xprt, xprt);
2336        struct socket *sock = transport->sock;
2337        unsigned long headoff;
2338        unsigned long tailoff;
2339
2340        xs_encode_stream_record_marker(xbufp);
2341
2342        tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2343        headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2344        len = svc_send_common(sock, xbufp,
2345                              virt_to_page(xbufp->head[0].iov_base), headoff,
2346                              xbufp->tail[0].iov_base, tailoff);
2347
2348        if (len != xbufp->len) {
2349                printk(KERN_NOTICE "Error sending entire callback!\n");
2350                len = -EAGAIN;
2351        }
2352
2353        return len;
2354}
2355
2356/*
2357 * The send routine. Borrows from svc_send
2358 */
2359static int bc_send_request(struct rpc_task *task)
2360{
2361        struct rpc_rqst *req = task->tk_rqstp;
2362        struct svc_xprt *xprt;
2363        struct svc_sock         *svsk;
2364        u32                     len;
2365
2366        dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2367        /*
2368         * Get the server socket associated with this callback xprt
2369         */
2370        xprt = req->rq_xprt->bc_xprt;
2371        svsk = container_of(xprt, struct svc_sock, sk_xprt);
2372
2373        /*
2374         * Grab the mutex to serialize data as the connection is shared
2375         * with the fore channel
2376         */
2377        if (!mutex_trylock(&xprt->xpt_mutex)) {
2378                rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2379                if (!mutex_trylock(&xprt->xpt_mutex))
2380                        return -EAGAIN;
2381                rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2382        }
2383        if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2384                len = -ENOTCONN;
2385        else
2386                len = bc_sendto(req);
2387        mutex_unlock(&xprt->xpt_mutex);
2388
2389        if (len > 0)
2390                len = 0;
2391
2392        return len;
2393}
2394
2395/*
2396 * The close routine. Since this is client initiated, we do nothing
2397 */
2398
2399static void bc_close(struct rpc_xprt *xprt)
2400{
2401}
2402
2403/*
2404 * The xprt destroy routine. Again, because this connection is client
2405 * initiated, we do nothing
2406 */
2407
2408static void bc_destroy(struct rpc_xprt *xprt)
2409{
2410}
2411
2412static struct rpc_xprt_ops xs_local_ops = {
2413        .reserve_xprt           = xprt_reserve_xprt,
2414        .release_xprt           = xs_tcp_release_xprt,
2415        .rpcbind                = xs_local_rpcbind,
2416        .set_port               = xs_local_set_port,
2417        .connect                = xs_connect,
2418        .buf_alloc              = rpc_malloc,
2419        .buf_free               = rpc_free,
2420        .send_request           = xs_local_send_request,
2421        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2422        .close                  = xs_close,
2423        .destroy                = xs_destroy,
2424        .print_stats            = xs_local_print_stats,
2425};
2426
2427static struct rpc_xprt_ops xs_udp_ops = {
2428        .set_buffer_size        = xs_udp_set_buffer_size,
2429        .reserve_xprt           = xprt_reserve_xprt_cong,
2430        .release_xprt           = xprt_release_xprt_cong,
2431        .rpcbind                = rpcb_getport_async,
2432        .set_port               = xs_set_port,
2433        .connect                = xs_connect,
2434        .buf_alloc              = rpc_malloc,
2435        .buf_free               = rpc_free,
2436        .send_request           = xs_udp_send_request,
2437        .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2438        .timer                  = xs_udp_timer,
2439        .release_request        = xprt_release_rqst_cong,
2440        .close                  = xs_close,
2441        .destroy                = xs_destroy,
2442        .print_stats            = xs_udp_print_stats,
2443};
2444
2445static struct rpc_xprt_ops xs_tcp_ops = {
2446        .reserve_xprt           = xprt_reserve_xprt,
2447        .release_xprt           = xs_tcp_release_xprt,
2448        .rpcbind                = rpcb_getport_async,
2449        .set_port               = xs_set_port,
2450        .connect                = xs_connect,
2451        .buf_alloc              = rpc_malloc,
2452        .buf_free               = rpc_free,
2453        .send_request           = xs_tcp_send_request,
2454        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2455        .close                  = xs_tcp_close,
2456        .destroy                = xs_destroy,
2457        .print_stats            = xs_tcp_print_stats,
2458};
2459
2460/*
2461 * The rpc_xprt_ops for the server backchannel
2462 */
2463
2464static struct rpc_xprt_ops bc_tcp_ops = {
2465        .reserve_xprt           = xprt_reserve_xprt,
2466        .release_xprt           = xprt_release_xprt,
2467        .buf_alloc              = bc_malloc,
2468        .buf_free               = bc_free,
2469        .send_request           = bc_send_request,
2470        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2471        .close                  = bc_close,
2472        .destroy                = bc_destroy,
2473        .print_stats            = xs_tcp_print_stats,
2474};
2475
2476static int xs_init_anyaddr(const int family, struct sockaddr *sap)
2477{
2478        static const struct sockaddr_in sin = {
2479                .sin_family             = AF_INET,
2480                .sin_addr.s_addr        = htonl(INADDR_ANY),
2481        };
2482        static const struct sockaddr_in6 sin6 = {
2483                .sin6_family            = AF_INET6,
2484                .sin6_addr              = IN6ADDR_ANY_INIT,
2485        };
2486
2487        switch (family) {
2488        case AF_LOCAL:
2489                break;
2490        case AF_INET:
2491                memcpy(sap, &sin, sizeof(sin));
2492                break;
2493        case AF_INET6:
2494                memcpy(sap, &sin6, sizeof(sin6));
2495                break;
2496        default:
2497                dprintk("RPC:       %s: Bad address family\n", __func__);
2498                return -EAFNOSUPPORT;
2499        }
2500        return 0;
2501}
2502
2503static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2504                                      unsigned int slot_table_size,
2505                                      unsigned int max_slot_table_size)
2506{
2507        struct rpc_xprt *xprt;
2508        struct sock_xprt *new;
2509
2510        if (args->addrlen > sizeof(xprt->addr)) {
2511                dprintk("RPC:       xs_setup_xprt: address too large\n");
2512                return ERR_PTR(-EBADF);
2513        }
2514
2515        xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
2516                        max_slot_table_size);
2517        if (xprt == NULL) {
2518                dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2519                                "rpc_xprt\n");
2520                return ERR_PTR(-ENOMEM);
2521        }
2522
2523        new = container_of(xprt, struct sock_xprt, xprt);
2524        memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2525        xprt->addrlen = args->addrlen;
2526        if (args->srcaddr)
2527                memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2528        else {
2529                int err;
2530                err = xs_init_anyaddr(args->dstaddr->sa_family,
2531                                        (struct sockaddr *)&new->srcaddr);
2532                if (err != 0) {
2533                        xprt_free(xprt);
2534                        return ERR_PTR(err);
2535                }
2536        }
2537
2538        return xprt;
2539}
2540
2541static const struct rpc_timeout xs_local_default_timeout = {
2542        .to_initval = 10 * HZ,
2543        .to_maxval = 10 * HZ,
2544        .to_retries = 2,
2545};
2546
2547/**
2548 * xs_setup_local - Set up transport to use an AF_LOCAL socket
2549 * @args: rpc transport creation arguments
2550 *
2551 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2552 */
2553static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2554{
2555        struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
2556        struct sock_xprt *transport;
2557        struct rpc_xprt *xprt;
2558        struct rpc_xprt *ret;
2559
2560        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2561                        xprt_max_tcp_slot_table_entries);
2562        if (IS_ERR(xprt))
2563                return xprt;
2564        transport = container_of(xprt, struct sock_xprt, xprt);
2565
2566        xprt->prot = 0;
2567        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2568        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2569
2570        xprt->bind_timeout = XS_BIND_TO;
2571        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2572        xprt->idle_timeout = XS_IDLE_DISC_TO;
2573
2574        xprt->ops = &xs_local_ops;
2575        xprt->timeout = &xs_local_default_timeout;
2576
2577        switch (sun->sun_family) {
2578        case AF_LOCAL:
2579                if (sun->sun_path[0] != '/') {
2580                        dprintk("RPC:       bad AF_LOCAL address: %s\n",
2581                                        sun->sun_path);
2582                        ret = ERR_PTR(-EINVAL);
2583                        goto out_err;
2584                }
2585                xprt_set_bound(xprt);
2586                INIT_DELAYED_WORK(&transport->connect_worker,
2587                                        xs_local_setup_socket);
2588                xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
2589                break;
2590        default:
2591                ret = ERR_PTR(-EAFNOSUPPORT);
2592                goto out_err;
2593        }
2594
2595        dprintk("RPC:       set up xprt to %s via AF_LOCAL\n",
2596                        xprt->address_strings[RPC_DISPLAY_ADDR]);
2597
2598        if (try_module_get(THIS_MODULE))
2599                return xprt;
2600        ret = ERR_PTR(-EINVAL);
2601out_err:
2602        xprt_free(xprt);
2603        return ret;
2604}
2605
2606static const struct rpc_timeout xs_udp_default_timeout = {
2607        .to_initval = 5 * HZ,
2608        .to_maxval = 30 * HZ,
2609        .to_increment = 5 * HZ,
2610        .to_retries = 5,
2611};
2612
2613/**
2614 * xs_setup_udp - Set up transport to use a UDP socket
2615 * @args: rpc transport creation arguments
2616 *
2617 */
2618static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2619{
2620        struct sockaddr *addr = args->dstaddr;
2621        struct rpc_xprt *xprt;
2622        struct sock_xprt *transport;
2623        struct rpc_xprt *ret;
2624
2625        xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
2626                        xprt_udp_slot_table_entries);
2627        if (IS_ERR(xprt))
2628                return xprt;
2629        transport = container_of(xprt, struct sock_xprt, xprt);
2630
2631        xprt->prot = IPPROTO_UDP;
2632        xprt->tsh_size = 0;
2633        /* XXX: header size can vary due to auth type, IPv6, etc. */
2634        xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2635
2636        xprt->bind_timeout = XS_BIND_TO;
2637        xprt->reestablish_timeout = XS_UDP_REEST_TO;
2638        xprt->idle_timeout = XS_IDLE_DISC_TO;
2639
2640        xprt->ops = &xs_udp_ops;
2641
2642        xprt->timeout = &xs_udp_default_timeout;
2643
2644        switch (addr->sa_family) {
2645        case AF_INET:
2646                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2647                        xprt_set_bound(xprt);
2648
2649                INIT_DELAYED_WORK(&transport->connect_worker,
2650                                        xs_udp_setup_socket);
2651                xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2652                break;
2653        case AF_INET6:
2654                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2655                        xprt_set_bound(xprt);
2656
2657                INIT_DELAYED_WORK(&transport->connect_worker,
2658                                        xs_udp_setup_socket);
2659                xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2660                break;
2661        default:
2662                ret = ERR_PTR(-EAFNOSUPPORT);
2663                goto out_err;
2664        }
2665
2666        if (xprt_bound(xprt))
2667                dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2668                                xprt->address_strings[RPC_DISPLAY_ADDR],
2669                                xprt->address_strings[RPC_DISPLAY_PORT],
2670                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2671        else
2672                dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2673                                xprt->address_strings[RPC_DISPLAY_ADDR],
2674                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2675
2676        if (try_module_get(THIS_MODULE))
2677                return xprt;
2678        ret = ERR_PTR(-EINVAL);
2679out_err:
2680        xprt_free(xprt);
2681        return ret;
2682}
2683
2684static const struct rpc_timeout xs_tcp_default_timeout = {
2685        .to_initval = 60 * HZ,
2686        .to_maxval = 60 * HZ,
2687        .to_retries = 2,
2688};
2689
2690/**
2691 * xs_setup_tcp - Set up transport to use a TCP socket
2692 * @args: rpc transport creation arguments
2693 *
2694 */
2695static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2696{
2697        struct sockaddr *addr = args->dstaddr;
2698        struct rpc_xprt *xprt;
2699        struct sock_xprt *transport;
2700        struct rpc_xprt *ret;
2701
2702        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2703                        xprt_max_tcp_slot_table_entries);
2704        if (IS_ERR(xprt))
2705                return xprt;
2706        transport = container_of(xprt, struct sock_xprt, xprt);
2707
2708        xprt->prot = IPPROTO_TCP;
2709        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2710        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2711
2712        xprt->bind_timeout = XS_BIND_TO;
2713        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2714        xprt->idle_timeout = XS_IDLE_DISC_TO;
2715
2716        xprt->ops = &xs_tcp_ops;
2717        xprt->timeout = &xs_tcp_default_timeout;
2718
2719        switch (addr->sa_family) {
2720        case AF_INET:
2721                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2722                        xprt_set_bound(xprt);
2723
2724                INIT_DELAYED_WORK(&transport->connect_worker,
2725                                        xs_tcp_setup_socket);
2726                xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2727                break;
2728        case AF_INET6:
2729                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2730                        xprt_set_bound(xprt);
2731
2732                INIT_DELAYED_WORK(&transport->connect_worker,
2733                                        xs_tcp_setup_socket);
2734                xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2735                break;
2736        default:
2737                ret = ERR_PTR(-EAFNOSUPPORT);
2738                goto out_err;
2739        }
2740
2741        if (xprt_bound(xprt))
2742                dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2743                                xprt->address_strings[RPC_DISPLAY_ADDR],
2744                                xprt->address_strings[RPC_DISPLAY_PORT],
2745                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2746        else
2747                dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2748                                xprt->address_strings[RPC_DISPLAY_ADDR],
2749                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2750
2751
2752        if (try_module_get(THIS_MODULE))
2753                return xprt;
2754        ret = ERR_PTR(-EINVAL);
2755out_err:
2756        xprt_free(xprt);
2757        return ret;
2758}
2759
2760/**
2761 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2762 * @args: rpc transport creation arguments
2763 *
2764 */
2765static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2766{
2767        struct sockaddr *addr = args->dstaddr;
2768        struct rpc_xprt *xprt;
2769        struct sock_xprt *transport;
2770        struct svc_sock *bc_sock;
2771        struct rpc_xprt *ret;
2772
2773        if (args->bc_xprt->xpt_bc_xprt) {
2774                /*
2775                 * This server connection already has a backchannel
2776                 * export; we can't create a new one, as we wouldn't be
2777                 * able to match replies based on xid any more.  So,
2778                 * reuse the already-existing one:
2779                 */
2780                 return args->bc_xprt->xpt_bc_xprt;
2781        }
2782        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2783                        xprt_tcp_slot_table_entries);
2784        if (IS_ERR(xprt))
2785                return xprt;
2786        transport = container_of(xprt, struct sock_xprt, xprt);
2787
2788        xprt->prot = IPPROTO_TCP;
2789        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2790        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2791        xprt->timeout = &xs_tcp_default_timeout;
2792
2793        /* backchannel */
2794        xprt_set_bound(xprt);
2795        xprt->bind_timeout = 0;
2796        xprt->reestablish_timeout = 0;
2797        xprt->idle_timeout = 0;
2798
2799        xprt->ops = &bc_tcp_ops;
2800
2801        switch (addr->sa_family) {
2802        case AF_INET:
2803                xs_format_peer_addresses(xprt, "tcp",
2804                                         RPCBIND_NETID_TCP);
2805                break;
2806        case AF_INET6:
2807                xs_format_peer_addresses(xprt, "tcp",
2808                                   RPCBIND_NETID_TCP6);
2809                break;
2810        default:
2811                ret = ERR_PTR(-EAFNOSUPPORT);
2812                goto out_err;
2813        }
2814
2815        dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2816                        xprt->address_strings[RPC_DISPLAY_ADDR],
2817                        xprt->address_strings[RPC_DISPLAY_PORT],
2818                        xprt->address_strings[RPC_DISPLAY_PROTO]);
2819
2820        /*
2821         * Once we've associated a backchannel xprt with a connection,
2822         * we want to keep it around as long as long as the connection
2823         * lasts, in case we need to start using it for a backchannel
2824         * again; this reference won't be dropped until bc_xprt is
2825         * destroyed.
2826         */
2827        xprt_get(xprt);
2828        args->bc_xprt->xpt_bc_xprt = xprt;
2829        xprt->bc_xprt = args->bc_xprt;
2830        bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2831        transport->sock = bc_sock->sk_sock;
2832        transport->inet = bc_sock->sk_sk;
2833
2834        /*
2835         * Since we don't want connections for the backchannel, we set
2836         * the xprt status to connected
2837         */
2838        xprt_set_connected(xprt);
2839
2840
2841        if (try_module_get(THIS_MODULE))
2842                return xprt;
2843        xprt_put(xprt);
2844        ret = ERR_PTR(-EINVAL);
2845out_err:
2846        xprt_free(xprt);
2847        return ret;
2848}
2849
2850static struct xprt_class        xs_local_transport = {
2851        .list           = LIST_HEAD_INIT(xs_local_transport.list),
2852        .name           = "named UNIX socket",
2853        .owner          = THIS_MODULE,
2854        .ident          = XPRT_TRANSPORT_LOCAL,
2855        .setup          = xs_setup_local,
2856};
2857
2858static struct xprt_class        xs_udp_transport = {
2859        .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2860        .name           = "udp",
2861        .owner          = THIS_MODULE,
2862        .ident          = XPRT_TRANSPORT_UDP,
2863        .setup          = xs_setup_udp,
2864};
2865
2866static struct xprt_class        xs_tcp_transport = {
2867        .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2868        .name           = "tcp",
2869        .owner          = THIS_MODULE,
2870        .ident          = XPRT_TRANSPORT_TCP,
2871        .setup          = xs_setup_tcp,
2872};
2873
2874static struct xprt_class        xs_bc_tcp_transport = {
2875        .list           = LIST_HEAD_INIT(xs_bc_tcp_transport.list),
2876        .name           = "tcp NFSv4.1 backchannel",
2877        .owner          = THIS_MODULE,
2878        .ident          = XPRT_TRANSPORT_BC_TCP,
2879        .setup          = xs_setup_bc_tcp,
2880};
2881
2882/**
2883 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2884 *
2885 */
2886int init_socket_xprt(void)
2887{
2888#ifdef RPC_DEBUG
2889        if (!sunrpc_table_header)
2890                sunrpc_table_header = register_sysctl_table(sunrpc_table);
2891#endif
2892
2893        xprt_register_transport(&xs_local_transport);
2894        xprt_register_transport(&xs_udp_transport);
2895        xprt_register_transport(&xs_tcp_transport);
2896        xprt_register_transport(&xs_bc_tcp_transport);
2897
2898        return 0;
2899}
2900
2901/**
2902 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2903 *
2904 */
2905void cleanup_socket_xprt(void)
2906{
2907#ifdef RPC_DEBUG
2908        if (sunrpc_table_header) {
2909                unregister_sysctl_table(sunrpc_table_header);
2910                sunrpc_table_header = NULL;
2911        }
2912#endif
2913
2914        xprt_unregister_transport(&xs_local_transport);
2915        xprt_unregister_transport(&xs_udp_transport);
2916        xprt_unregister_transport(&xs_tcp_transport);
2917        xprt_unregister_transport(&xs_bc_tcp_transport);
2918}
2919
2920static int param_set_uint_minmax(const char *val,
2921                const struct kernel_param *kp,
2922                unsigned int min, unsigned int max)
2923{
2924        unsigned long num;
2925        int ret;
2926
2927        if (!val)
2928                return -EINVAL;
2929        ret = strict_strtoul(val, 0, &num);
2930        if (ret == -EINVAL || num < min || num > max)
2931                return -EINVAL;
2932        *((unsigned int *)kp->arg) = num;
2933        return 0;
2934}
2935
2936static int param_set_portnr(const char *val, const struct kernel_param *kp)
2937{
2938        return param_set_uint_minmax(val, kp,
2939                        RPC_MIN_RESVPORT,
2940                        RPC_MAX_RESVPORT);
2941}
2942
2943static struct kernel_param_ops param_ops_portnr = {
2944        .set = param_set_portnr,
2945        .get = param_get_uint,
2946};
2947
2948#define param_check_portnr(name, p) \
2949        __param_check(name, p, unsigned int);
2950
2951module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
2952module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
2953
2954static int param_set_slot_table_size(const char *val,
2955                                     const struct kernel_param *kp)
2956{
2957        return param_set_uint_minmax(val, kp,
2958                        RPC_MIN_SLOT_TABLE,
2959                        RPC_MAX_SLOT_TABLE);
2960}
2961
2962static struct kernel_param_ops param_ops_slot_table_size = {
2963        .set = param_set_slot_table_size,
2964        .get = param_get_uint,
2965};
2966
2967#define param_check_slot_table_size(name, p) \
2968        __param_check(name, p, unsigned int);
2969
2970static int param_set_max_slot_table_size(const char *val,
2971                                     const struct kernel_param *kp)
2972{
2973        return param_set_uint_minmax(val, kp,
2974                        RPC_MIN_SLOT_TABLE,
2975                        RPC_MAX_SLOT_TABLE_LIMIT);
2976}
2977
2978static struct kernel_param_ops param_ops_max_slot_table_size = {
2979        .set = param_set_max_slot_table_size,
2980        .get = param_get_uint,
2981};
2982
2983#define param_check_max_slot_table_size(name, p) \
2984        __param_check(name, p, unsigned int);
2985
2986module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
2987                   slot_table_size, 0644);
2988module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
2989                   max_slot_table_size, 0644);
2990module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
2991                   slot_table_size, 0644);
2992
2993
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.