linux/net/sunrpc/xprtsock.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/xprtsock.c
   3 *
   4 * Client-side transport implementation for sockets.
   5 *
   6 * TCP callback races fixes (C) 1998 Red Hat
   7 * TCP send fixes (C) 1998 Red Hat
   8 * TCP NFS related read + write fixes
   9 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10 *
  11 * Rewrite of larges part of the code in order to stabilize TCP stuff.
  12 * Fix behaviour when socket buffer is full.
  13 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
  14 *
  15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
  16 *
  17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
  18 *   <gilles.quillard@bull.net>
  19 */
  20
  21#include <linux/types.h>
  22#include <linux/string.h>
  23#include <linux/slab.h>
  24#include <linux/module.h>
  25#include <linux/capability.h>
  26#include <linux/pagemap.h>
  27#include <linux/errno.h>
  28#include <linux/socket.h>
  29#include <linux/in.h>
  30#include <linux/net.h>
  31#include <linux/mm.h>
  32#include <linux/un.h>
  33#include <linux/udp.h>
  34#include <linux/tcp.h>
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/sched.h>
  37#include <linux/sunrpc/svcsock.h>
  38#include <linux/sunrpc/xprtsock.h>
  39#include <linux/file.h>
  40#ifdef CONFIG_SUNRPC_BACKCHANNEL
  41#include <linux/sunrpc/bc_xprt.h>
  42#endif
  43
  44#include <net/sock.h>
  45#include <net/checksum.h>
  46#include <net/udp.h>
  47#include <net/tcp.h>
  48
  49#include "sunrpc.h"
  50
  51static void xs_close(struct rpc_xprt *xprt);
  52
  53/*
  54 * xprtsock tunables
  55 */
  56static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
  57static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
  58static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
  59
  60static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
  61static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
  62
  63#define XS_TCP_LINGER_TO        (15U * HZ)
  64static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
  65
  66/*
  67 * We can register our own files under /proc/sys/sunrpc by
  68 * calling register_sysctl_table() again.  The files in that
  69 * directory become the union of all files registered there.
  70 *
  71 * We simply need to make sure that we don't collide with
  72 * someone else's file names!
  73 */
  74
  75#ifdef RPC_DEBUG
  76
  77static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
  78static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
  79static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
  80static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
  81static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
  82
  83static struct ctl_table_header *sunrpc_table_header;
  84
  85/*
  86 * FIXME: changing the UDP slot table size should also resize the UDP
  87 *        socket buffers for existing UDP transports
  88 */
  89static ctl_table xs_tunables_table[] = {
  90        {
  91                .procname       = "udp_slot_table_entries",
  92                .data           = &xprt_udp_slot_table_entries,
  93                .maxlen         = sizeof(unsigned int),
  94                .mode           = 0644,
  95                .proc_handler   = proc_dointvec_minmax,
  96                .extra1         = &min_slot_table_size,
  97                .extra2         = &max_slot_table_size
  98        },
  99        {
 100                .procname       = "tcp_slot_table_entries",
 101                .data           = &xprt_tcp_slot_table_entries,
 102                .maxlen         = sizeof(unsigned int),
 103                .mode           = 0644,
 104                .proc_handler   = proc_dointvec_minmax,
 105                .extra1         = &min_slot_table_size,
 106                .extra2         = &max_slot_table_size
 107        },
 108        {
 109                .procname       = "tcp_max_slot_table_entries",
 110                .data           = &xprt_max_tcp_slot_table_entries,
 111                .maxlen         = sizeof(unsigned int),
 112                .mode           = 0644,
 113                .proc_handler   = proc_dointvec_minmax,
 114                .extra1         = &min_slot_table_size,
 115                .extra2         = &max_tcp_slot_table_limit
 116        },
 117        {
 118                .procname       = "min_resvport",
 119                .data           = &xprt_min_resvport,
 120                .maxlen         = sizeof(unsigned int),
 121                .mode           = 0644,
 122                .proc_handler   = proc_dointvec_minmax,
 123                .extra1         = &xprt_min_resvport_limit,
 124                .extra2         = &xprt_max_resvport_limit
 125        },
 126        {
 127                .procname       = "max_resvport",
 128                .data           = &xprt_max_resvport,
 129                .maxlen         = sizeof(unsigned int),
 130                .mode           = 0644,
 131                .proc_handler   = proc_dointvec_minmax,
 132                .extra1         = &xprt_min_resvport_limit,
 133                .extra2         = &xprt_max_resvport_limit
 134        },
 135        {
 136                .procname       = "tcp_fin_timeout",
 137                .data           = &xs_tcp_fin_timeout,
 138                .maxlen         = sizeof(xs_tcp_fin_timeout),
 139                .mode           = 0644,
 140                .proc_handler   = proc_dointvec_jiffies,
 141        },
 142        { },
 143};
 144
 145static ctl_table sunrpc_table[] = {
 146        {
 147                .procname       = "sunrpc",
 148                .mode           = 0555,
 149                .child          = xs_tunables_table
 150        },
 151        { },
 152};
 153
 154#endif
 155
 156/*
 157 * Wait duration for a reply from the RPC portmapper.
 158 */
 159#define XS_BIND_TO              (60U * HZ)
 160
 161/*
 162 * Delay if a UDP socket connect error occurs.  This is most likely some
 163 * kind of resource problem on the local host.
 164 */
 165#define XS_UDP_REEST_TO         (2U * HZ)
 166
 167/*
 168 * The reestablish timeout allows clients to delay for a bit before attempting
 169 * to reconnect to a server that just dropped our connection.
 170 *
 171 * We implement an exponential backoff when trying to reestablish a TCP
 172 * transport connection with the server.  Some servers like to drop a TCP
 173 * connection when they are overworked, so we start with a short timeout and
 174 * increase over time if the server is down or not responding.
 175 */
 176#define XS_TCP_INIT_REEST_TO    (3U * HZ)
 177#define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
 178
 179/*
 180 * TCP idle timeout; client drops the transport socket if it is idle
 181 * for this long.  Note that we also timeout UDP sockets to prevent
 182 * holding port numbers when there is no RPC traffic.
 183 */
 184#define XS_IDLE_DISC_TO         (5U * 60 * HZ)
 185
 186#ifdef RPC_DEBUG
 187# undef  RPC_DEBUG_DATA
 188# define RPCDBG_FACILITY        RPCDBG_TRANS
 189#endif
 190
 191#ifdef RPC_DEBUG_DATA
 192static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 193{
 194        u8 *buf = (u8 *) packet;
 195        int j;
 196
 197        dprintk("RPC:       %s\n", msg);
 198        for (j = 0; j < count && j < 128; j += 4) {
 199                if (!(j & 31)) {
 200                        if (j)
 201                                dprintk("\n");
 202                        dprintk("0x%04x ", j);
 203                }
 204                dprintk("%02x%02x%02x%02x ",
 205                        buf[j], buf[j+1], buf[j+2], buf[j+3]);
 206        }
 207        dprintk("\n");
 208}
 209#else
 210static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 211{
 212        /* NOP */
 213}
 214#endif
 215
 216struct sock_xprt {
 217        struct rpc_xprt         xprt;
 218
 219        /*
 220         * Network layer
 221         */
 222        struct socket *         sock;
 223        struct sock *           inet;
 224
 225        /*
 226         * State of TCP reply receive
 227         */
 228        __be32                  tcp_fraghdr,
 229                                tcp_xid,
 230                                tcp_calldir;
 231
 232        u32                     tcp_offset,
 233                                tcp_reclen;
 234
 235        unsigned long           tcp_copied,
 236                                tcp_flags;
 237
 238        /*
 239         * Connection of transports
 240         */
 241        struct delayed_work     connect_worker;
 242        struct sockaddr_storage srcaddr;
 243        unsigned short          srcport;
 244
 245        /*
 246         * UDP socket buffer size parameters
 247         */
 248        size_t                  rcvsize,
 249                                sndsize;
 250
 251        /*
 252         * Saved socket callback addresses
 253         */
 254        void                    (*old_data_ready)(struct sock *, int);
 255        void                    (*old_state_change)(struct sock *);
 256        void                    (*old_write_space)(struct sock *);
 257        void                    (*old_error_report)(struct sock *);
 258};
 259
 260/*
 261 * TCP receive state flags
 262 */
 263#define TCP_RCV_LAST_FRAG       (1UL << 0)
 264#define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
 265#define TCP_RCV_COPY_XID        (1UL << 2)
 266#define TCP_RCV_COPY_DATA       (1UL << 3)
 267#define TCP_RCV_READ_CALLDIR    (1UL << 4)
 268#define TCP_RCV_COPY_CALLDIR    (1UL << 5)
 269
 270/*
 271 * TCP RPC flags
 272 */
 273#define TCP_RPC_REPLY           (1UL << 6)
 274
 275static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
 276{
 277        return (struct sockaddr *) &xprt->addr;
 278}
 279
 280static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
 281{
 282        return (struct sockaddr_un *) &xprt->addr;
 283}
 284
 285static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
 286{
 287        return (struct sockaddr_in *) &xprt->addr;
 288}
 289
 290static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
 291{
 292        return (struct sockaddr_in6 *) &xprt->addr;
 293}
 294
 295static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
 296{
 297        struct sockaddr *sap = xs_addr(xprt);
 298        struct sockaddr_in6 *sin6;
 299        struct sockaddr_in *sin;
 300        struct sockaddr_un *sun;
 301        char buf[128];
 302
 303        switch (sap->sa_family) {
 304        case AF_LOCAL:
 305                sun = xs_addr_un(xprt);
 306                strlcpy(buf, sun->sun_path, sizeof(buf));
 307                xprt->address_strings[RPC_DISPLAY_ADDR] =
 308                                                kstrdup(buf, GFP_KERNEL);
 309                break;
 310        case AF_INET:
 311                (void)rpc_ntop(sap, buf, sizeof(buf));
 312                xprt->address_strings[RPC_DISPLAY_ADDR] =
 313                                                kstrdup(buf, GFP_KERNEL);
 314                sin = xs_addr_in(xprt);
 315                snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
 316                break;
 317        case AF_INET6:
 318                (void)rpc_ntop(sap, buf, sizeof(buf));
 319                xprt->address_strings[RPC_DISPLAY_ADDR] =
 320                                                kstrdup(buf, GFP_KERNEL);
 321                sin6 = xs_addr_in6(xprt);
 322                snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 323                break;
 324        default:
 325                BUG();
 326        }
 327
 328        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 329}
 330
 331static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
 332{
 333        struct sockaddr *sap = xs_addr(xprt);
 334        char buf[128];
 335
 336        snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
 337        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 338
 339        snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
 340        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 341}
 342
 343static void xs_format_peer_addresses(struct rpc_xprt *xprt,
 344                                     const char *protocol,
 345                                     const char *netid)
 346{
 347        xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
 348        xprt->address_strings[RPC_DISPLAY_NETID] = netid;
 349        xs_format_common_peer_addresses(xprt);
 350        xs_format_common_peer_ports(xprt);
 351}
 352
 353static void xs_update_peer_port(struct rpc_xprt *xprt)
 354{
 355        kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
 356        kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
 357
 358        xs_format_common_peer_ports(xprt);
 359}
 360
 361static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 362{
 363        unsigned int i;
 364
 365        for (i = 0; i < RPC_DISPLAY_MAX; i++)
 366                switch (i) {
 367                case RPC_DISPLAY_PROTO:
 368                case RPC_DISPLAY_NETID:
 369                        continue;
 370                default:
 371                        kfree(xprt->address_strings[i]);
 372                }
 373}
 374
 375#define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
 376
 377static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
 378{
 379        struct msghdr msg = {
 380                .msg_name       = addr,
 381                .msg_namelen    = addrlen,
 382                .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
 383        };
 384        struct kvec iov = {
 385                .iov_base       = vec->iov_base + base,
 386                .iov_len        = vec->iov_len - base,
 387        };
 388
 389        if (iov.iov_len != 0)
 390                return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
 391        return kernel_sendmsg(sock, &msg, NULL, 0, 0);
 392}
 393
 394static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
 395{
 396        struct page **ppage;
 397        unsigned int remainder;
 398        int err, sent = 0;
 399
 400        remainder = xdr->page_len - base;
 401        base += xdr->page_base;
 402        ppage = xdr->pages + (base >> PAGE_SHIFT);
 403        base &= ~PAGE_MASK;
 404        for(;;) {
 405                unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
 406                int flags = XS_SENDMSG_FLAGS;
 407
 408                remainder -= len;
 409                if (remainder != 0 || more)
 410                        flags |= MSG_MORE;
 411                err = sock->ops->sendpage(sock, *ppage, base, len, flags);
 412                if (remainder == 0 || err != len)
 413                        break;
 414                sent += err;
 415                ppage++;
 416                base = 0;
 417        }
 418        if (sent == 0)
 419                return err;
 420        if (err > 0)
 421                sent += err;
 422        return sent;
 423}
 424
 425/**
 426 * xs_sendpages - write pages directly to a socket
 427 * @sock: socket to send on
 428 * @addr: UDP only -- address of destination
 429 * @addrlen: UDP only -- length of destination address
 430 * @xdr: buffer containing this request
 431 * @base: starting position in the buffer
 432 *
 433 */
 434static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
 435{
 436        unsigned int remainder = xdr->len - base;
 437        int err, sent = 0;
 438
 439        if (unlikely(!sock))
 440                return -ENOTSOCK;
 441
 442        clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
 443        if (base != 0) {
 444                addr = NULL;
 445                addrlen = 0;
 446        }
 447
 448        if (base < xdr->head[0].iov_len || addr != NULL) {
 449                unsigned int len = xdr->head[0].iov_len - base;
 450                remainder -= len;
 451                err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
 452                if (remainder == 0 || err != len)
 453                        goto out;
 454                sent += err;
 455                base = 0;
 456        } else
 457                base -= xdr->head[0].iov_len;
 458
 459        if (base < xdr->page_len) {
 460                unsigned int len = xdr->page_len - base;
 461                remainder -= len;
 462                err = xs_send_pagedata(sock, xdr, base, remainder != 0);
 463                if (remainder == 0 || err != len)
 464                        goto out;
 465                sent += err;
 466                base = 0;
 467        } else
 468                base -= xdr->page_len;
 469
 470        if (base >= xdr->tail[0].iov_len)
 471                return sent;
 472        err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
 473out:
 474        if (sent == 0)
 475                return err;
 476        if (err > 0)
 477                sent += err;
 478        return sent;
 479}
 480
 481static void xs_nospace_callback(struct rpc_task *task)
 482{
 483        struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
 484
 485        transport->inet->sk_write_pending--;
 486        clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 487}
 488
 489/**
 490 * xs_nospace - place task on wait queue if transmit was incomplete
 491 * @task: task to put to sleep
 492 *
 493 */
 494static int xs_nospace(struct rpc_task *task)
 495{
 496        struct rpc_rqst *req = task->tk_rqstp;
 497        struct rpc_xprt *xprt = req->rq_xprt;
 498        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 499        int ret = -EAGAIN;
 500
 501        dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
 502                        task->tk_pid, req->rq_slen - req->rq_bytes_sent,
 503                        req->rq_slen);
 504
 505        /* Protect against races with write_space */
 506        spin_lock_bh(&xprt->transport_lock);
 507
 508        /* Don't race with disconnect */
 509        if (xprt_connected(xprt)) {
 510                if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
 511                        /*
 512                         * Notify TCP that we're limited by the application
 513                         * window size
 514                         */
 515                        set_bit(SOCK_NOSPACE, &transport->sock->flags);
 516                        transport->inet->sk_write_pending++;
 517                        /* ...and wait for more buffer space */
 518                        xprt_wait_for_buffer_space(task, xs_nospace_callback);
 519                }
 520        } else {
 521                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 522                ret = -ENOTCONN;
 523        }
 524
 525        spin_unlock_bh(&xprt->transport_lock);
 526        return ret;
 527}
 528
 529/*
 530 * Construct a stream transport record marker in @buf.
 531 */
 532static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
 533{
 534        u32 reclen = buf->len - sizeof(rpc_fraghdr);
 535        rpc_fraghdr *base = buf->head[0].iov_base;
 536        *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
 537}
 538
 539/**
 540 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
 541 * @task: RPC task that manages the state of an RPC request
 542 *
 543 * Return values:
 544 *        0:    The request has been sent
 545 *   EAGAIN:    The socket was blocked, please call again later to
 546 *              complete the request
 547 * ENOTCONN:    Caller needs to invoke connect logic then call again
 548 *    other:    Some other error occured, the request was not sent
 549 */
 550static int xs_local_send_request(struct rpc_task *task)
 551{
 552        struct rpc_rqst *req = task->tk_rqstp;
 553        struct rpc_xprt *xprt = req->rq_xprt;
 554        struct sock_xprt *transport =
 555                                container_of(xprt, struct sock_xprt, xprt);
 556        struct xdr_buf *xdr = &req->rq_snd_buf;
 557        int status;
 558
 559        xs_encode_stream_record_marker(&req->rq_snd_buf);
 560
 561        xs_pktdump("packet data:",
 562                        req->rq_svec->iov_base, req->rq_svec->iov_len);
 563
 564        status = xs_sendpages(transport->sock, NULL, 0,
 565                                                xdr, req->rq_bytes_sent);
 566        dprintk("RPC:       %s(%u) = %d\n",
 567                        __func__, xdr->len - req->rq_bytes_sent, status);
 568        if (likely(status >= 0)) {
 569                req->rq_bytes_sent += status;
 570                req->rq_xmit_bytes_sent += status;
 571                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 572                        req->rq_bytes_sent = 0;
 573                        return 0;
 574                }
 575                status = -EAGAIN;
 576        }
 577
 578        switch (status) {
 579        case -EAGAIN:
 580                status = xs_nospace(task);
 581                break;
 582        default:
 583                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 584                        -status);
 585        case -EPIPE:
 586                xs_close(xprt);
 587                status = -ENOTCONN;
 588        }
 589
 590        return status;
 591}
 592
 593/**
 594 * xs_udp_send_request - write an RPC request to a UDP socket
 595 * @task: address of RPC task that manages the state of an RPC request
 596 *
 597 * Return values:
 598 *        0:    The request has been sent
 599 *   EAGAIN:    The socket was blocked, please call again later to
 600 *              complete the request
 601 * ENOTCONN:    Caller needs to invoke connect logic then call again
 602 *    other:    Some other error occurred, the request was not sent
 603 */
 604static int xs_udp_send_request(struct rpc_task *task)
 605{
 606        struct rpc_rqst *req = task->tk_rqstp;
 607        struct rpc_xprt *xprt = req->rq_xprt;
 608        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 609        struct xdr_buf *xdr = &req->rq_snd_buf;
 610        int status;
 611
 612        xs_pktdump("packet data:",
 613                                req->rq_svec->iov_base,
 614                                req->rq_svec->iov_len);
 615
 616        if (!xprt_bound(xprt))
 617                return -ENOTCONN;
 618        status = xs_sendpages(transport->sock,
 619                              xs_addr(xprt),
 620                              xprt->addrlen, xdr,
 621                              req->rq_bytes_sent);
 622
 623        dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
 624                        xdr->len - req->rq_bytes_sent, status);
 625
 626        if (status >= 0) {
 627                req->rq_xmit_bytes_sent += status;
 628                if (status >= req->rq_slen)
 629                        return 0;
 630                /* Still some bytes left; set up for a retry later. */
 631                status = -EAGAIN;
 632        }
 633
 634        switch (status) {
 635        case -ENOTSOCK:
 636                status = -ENOTCONN;
 637                /* Should we call xs_close() here? */
 638                break;
 639        case -EAGAIN:
 640                status = xs_nospace(task);
 641                break;
 642        default:
 643                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 644                        -status);
 645        case -ENETUNREACH:
 646        case -EPIPE:
 647        case -ECONNREFUSED:
 648                /* When the server has died, an ICMP port unreachable message
 649                 * prompts ECONNREFUSED. */
 650                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 651        }
 652
 653        return status;
 654}
 655
 656/**
 657 * xs_tcp_shutdown - gracefully shut down a TCP socket
 658 * @xprt: transport
 659 *
 660 * Initiates a graceful shutdown of the TCP socket by calling the
 661 * equivalent of shutdown(SHUT_WR);
 662 */
 663static void xs_tcp_shutdown(struct rpc_xprt *xprt)
 664{
 665        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 666        struct socket *sock = transport->sock;
 667
 668        if (sock != NULL)
 669                kernel_sock_shutdown(sock, SHUT_WR);
 670}
 671
 672/**
 673 * xs_tcp_send_request - write an RPC request to a TCP socket
 674 * @task: address of RPC task that manages the state of an RPC request
 675 *
 676 * Return values:
 677 *        0:    The request has been sent
 678 *   EAGAIN:    The socket was blocked, please call again later to
 679 *              complete the request
 680 * ENOTCONN:    Caller needs to invoke connect logic then call again
 681 *    other:    Some other error occurred, the request was not sent
 682 *
 683 * XXX: In the case of soft timeouts, should we eventually give up
 684 *      if sendmsg is not able to make progress?
 685 */
 686static int xs_tcp_send_request(struct rpc_task *task)
 687{
 688        struct rpc_rqst *req = task->tk_rqstp;
 689        struct rpc_xprt *xprt = req->rq_xprt;
 690        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 691        struct xdr_buf *xdr = &req->rq_snd_buf;
 692        int status;
 693
 694        xs_encode_stream_record_marker(&req->rq_snd_buf);
 695
 696        xs_pktdump("packet data:",
 697                                req->rq_svec->iov_base,
 698                                req->rq_svec->iov_len);
 699
 700        /* Continue transmitting the packet/record. We must be careful
 701         * to cope with writespace callbacks arriving _after_ we have
 702         * called sendmsg(). */
 703        while (1) {
 704                status = xs_sendpages(transport->sock,
 705                                        NULL, 0, xdr, req->rq_bytes_sent);
 706
 707                dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
 708                                xdr->len - req->rq_bytes_sent, status);
 709
 710                if (unlikely(status < 0))
 711                        break;
 712
 713                /* If we've sent the entire packet, immediately
 714                 * reset the count of bytes sent. */
 715                req->rq_bytes_sent += status;
 716                req->rq_xmit_bytes_sent += status;
 717                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 718                        req->rq_bytes_sent = 0;
 719                        return 0;
 720                }
 721
 722                if (status != 0)
 723                        continue;
 724                status = -EAGAIN;
 725                break;
 726        }
 727
 728        switch (status) {
 729        case -ENOTSOCK:
 730                status = -ENOTCONN;
 731                /* Should we call xs_close() here? */
 732                break;
 733        case -EAGAIN:
 734                status = xs_nospace(task);
 735                break;
 736        default:
 737                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
 738                        -status);
 739        case -ECONNRESET:
 740        case -EPIPE:
 741                xs_tcp_shutdown(xprt);
 742        case -ECONNREFUSED:
 743        case -ENOTCONN:
 744                clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
 745        }
 746
 747        return status;
 748}
 749
 750/**
 751 * xs_tcp_release_xprt - clean up after a tcp transmission
 752 * @xprt: transport
 753 * @task: rpc task
 754 *
 755 * This cleans up if an error causes us to abort the transmission of a request.
 756 * In this case, the socket may need to be reset in order to avoid confusing
 757 * the server.
 758 */
 759static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 760{
 761        struct rpc_rqst *req;
 762
 763        if (task != xprt->snd_task)
 764                return;
 765        if (task == NULL)
 766                goto out_release;
 767        req = task->tk_rqstp;
 768        if (req == NULL)
 769                goto out_release;
 770        if (req->rq_bytes_sent == 0)
 771                goto out_release;
 772        if (req->rq_bytes_sent == req->rq_snd_buf.len)
 773                goto out_release;
 774        set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
 775out_release:
 776        xprt_release_xprt(xprt, task);
 777}
 778
 779static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 780{
 781        transport->old_data_ready = sk->sk_data_ready;
 782        transport->old_state_change = sk->sk_state_change;
 783        transport->old_write_space = sk->sk_write_space;
 784        transport->old_error_report = sk->sk_error_report;
 785}
 786
 787static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 788{
 789        sk->sk_data_ready = transport->old_data_ready;
 790        sk->sk_state_change = transport->old_state_change;
 791        sk->sk_write_space = transport->old_write_space;
 792        sk->sk_error_report = transport->old_error_report;
 793}
 794
 795static void xs_reset_transport(struct sock_xprt *transport)
 796{
 797        struct socket *sock = transport->sock;
 798        struct sock *sk = transport->inet;
 799
 800        if (sk == NULL)
 801                return;
 802
 803        transport->srcport = 0;
 804
 805        write_lock_bh(&sk->sk_callback_lock);
 806        transport->inet = NULL;
 807        transport->sock = NULL;
 808
 809        sk->sk_user_data = NULL;
 810
 811        xs_restore_old_callbacks(transport, sk);
 812        write_unlock_bh(&sk->sk_callback_lock);
 813
 814        sk->sk_no_check = 0;
 815
 816        sock_release(sock);
 817}
 818
 819/**
 820 * xs_close - close a socket
 821 * @xprt: transport
 822 *
 823 * This is used when all requests are complete; ie, no DRC state remains
 824 * on the server we want to save.
 825 *
 826 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
 827 * xs_reset_transport() zeroing the socket from underneath a writer.
 828 */
 829static void xs_close(struct rpc_xprt *xprt)
 830{
 831        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 832
 833        dprintk("RPC:       xs_close xprt %p\n", xprt);
 834
 835        xs_reset_transport(transport);
 836        xprt->reestablish_timeout = 0;
 837
 838        smp_mb__before_clear_bit();
 839        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
 840        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 841        clear_bit(XPRT_CLOSING, &xprt->state);
 842        smp_mb__after_clear_bit();
 843        xprt_disconnect_done(xprt);
 844}
 845
 846static void xs_tcp_close(struct rpc_xprt *xprt)
 847{
 848        if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
 849                xs_close(xprt);
 850        else
 851                xs_tcp_shutdown(xprt);
 852}
 853
 854/**
 855 * xs_destroy - prepare to shutdown a transport
 856 * @xprt: doomed transport
 857 *
 858 */
 859static void xs_destroy(struct rpc_xprt *xprt)
 860{
 861        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 862
 863        dprintk("RPC:       xs_destroy xprt %p\n", xprt);
 864
 865        cancel_delayed_work_sync(&transport->connect_worker);
 866
 867        xs_close(xprt);
 868        xs_free_peer_addresses(xprt);
 869        xprt_free(xprt);
 870        module_put(THIS_MODULE);
 871}
 872
 873static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
 874{
 875        return (struct rpc_xprt *) sk->sk_user_data;
 876}
 877
 878static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
 879{
 880        struct xdr_skb_reader desc = {
 881                .skb            = skb,
 882                .offset         = sizeof(rpc_fraghdr),
 883                .count          = skb->len - sizeof(rpc_fraghdr),
 884        };
 885
 886        if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
 887                return -1;
 888        if (desc.count)
 889                return -1;
 890        return 0;
 891}
 892
 893/**
 894 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
 895 * @sk: socket with data to read
 896 * @len: how much data to read
 897 *
 898 * Currently this assumes we can read the whole reply in a single gulp.
 899 */
 900static void xs_local_data_ready(struct sock *sk, int len)
 901{
 902        struct rpc_task *task;
 903        struct rpc_xprt *xprt;
 904        struct rpc_rqst *rovr;
 905        struct sk_buff *skb;
 906        int err, repsize, copied;
 907        u32 _xid;
 908        __be32 *xp;
 909
 910        read_lock_bh(&sk->sk_callback_lock);
 911        dprintk("RPC:       %s...\n", __func__);
 912        xprt = xprt_from_sock(sk);
 913        if (xprt == NULL)
 914                goto out;
 915
 916        skb = skb_recv_datagram(sk, 0, 1, &err);
 917        if (skb == NULL)
 918                goto out;
 919
 920        if (xprt->shutdown)
 921                goto dropit;
 922
 923        repsize = skb->len - sizeof(rpc_fraghdr);
 924        if (repsize < 4) {
 925                dprintk("RPC:       impossible RPC reply size %d\n", repsize);
 926                goto dropit;
 927        }
 928
 929        /* Copy the XID from the skb... */
 930        xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
 931        if (xp == NULL)
 932                goto dropit;
 933
 934        /* Look up and lock the request corresponding to the given XID */
 935        spin_lock(&xprt->transport_lock);
 936        rovr = xprt_lookup_rqst(xprt, *xp);
 937        if (!rovr)
 938                goto out_unlock;
 939        task = rovr->rq_task;
 940
 941        copied = rovr->rq_private_buf.buflen;
 942        if (copied > repsize)
 943                copied = repsize;
 944
 945        if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 946                dprintk("RPC:       sk_buff copy failed\n");
 947                goto out_unlock;
 948        }
 949
 950        xprt_complete_rqst(task, copied);
 951
 952 out_unlock:
 953        spin_unlock(&xprt->transport_lock);
 954 dropit:
 955        skb_free_datagram(sk, skb);
 956 out:
 957        read_unlock_bh(&sk->sk_callback_lock);
 958}
 959
 960/**
 961 * xs_udp_data_ready - "data ready" callback for UDP sockets
 962 * @sk: socket with data to read
 963 * @len: how much data to read
 964 *
 965 */
 966static void xs_udp_data_ready(struct sock *sk, int len)
 967{
 968        struct rpc_task *task;
 969        struct rpc_xprt *xprt;
 970        struct rpc_rqst *rovr;
 971        struct sk_buff *skb;
 972        int err, repsize, copied;
 973        u32 _xid;
 974        __be32 *xp;
 975
 976        read_lock_bh(&sk->sk_callback_lock);
 977        dprintk("RPC:       xs_udp_data_ready...\n");
 978        if (!(xprt = xprt_from_sock(sk)))
 979                goto out;
 980
 981        if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
 982                goto out;
 983
 984        if (xprt->shutdown)
 985                goto dropit;
 986
 987        repsize = skb->len - sizeof(struct udphdr);
 988        if (repsize < 4) {
 989                dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
 990                goto dropit;
 991        }
 992
 993        /* Copy the XID from the skb... */
 994        xp = skb_header_pointer(skb, sizeof(struct udphdr),
 995                                sizeof(_xid), &_xid);
 996        if (xp == NULL)
 997                goto dropit;
 998
 999        /* Look up and lock the request corresponding to the given XID */
1000        spin_lock(&xprt->transport_lock);
1001        rovr = xprt_lookup_rqst(xprt, *xp);
1002        if (!rovr)
1003                goto out_unlock;
1004        task = rovr->rq_task;
1005
1006        if ((copied = rovr->rq_private_buf.buflen) > repsize)
1007                copied = repsize;
1008
1009        /* Suck it into the iovec, verify checksum if not done by hw. */
1010        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1011                UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
1012                goto out_unlock;
1013        }
1014
1015        UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
1016
1017        xprt_adjust_cwnd(task, copied);
1018        xprt_complete_rqst(task, copied);
1019
1020 out_unlock:
1021        spin_unlock(&xprt->transport_lock);
1022 dropit:
1023        skb_free_datagram(sk, skb);
1024 out:
1025        read_unlock_bh(&sk->sk_callback_lock);
1026}
1027
1028static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1029{
1030        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1031        size_t len, used;
1032        char *p;
1033
1034        p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
1035        len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1036        used = xdr_skb_read_bits(desc, p, len);
1037        transport->tcp_offset += used;
1038        if (used != len)
1039                return;
1040
1041        transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
1042        if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1043                transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1044        else
1045                transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1046        transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1047
1048        transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1049        transport->tcp_offset = 0;
1050
1051        /* Sanity check of the record length */
1052        if (unlikely(transport->tcp_reclen < 8)) {
1053                dprintk("RPC:       invalid TCP record fragment length\n");
1054                xprt_force_disconnect(xprt);
1055                return;
1056        }
1057        dprintk("RPC:       reading TCP record fragment of length %d\n",
1058                        transport->tcp_reclen);
1059}
1060
1061static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1062{
1063        if (transport->tcp_offset == transport->tcp_reclen) {
1064                transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1065                transport->tcp_offset = 0;
1066                if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
1067                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1068                        transport->tcp_flags |= TCP_RCV_COPY_XID;
1069                        transport->tcp_copied = 0;
1070                }
1071        }
1072}
1073
1074static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1075{
1076        size_t len, used;
1077        char *p;
1078
1079        len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1080        dprintk("RPC:       reading XID (%Zu bytes)\n", len);
1081        p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1082        used = xdr_skb_read_bits(desc, p, len);
1083        transport->tcp_offset += used;
1084        if (used != len)
1085                return;
1086        transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1087        transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1088        transport->tcp_copied = 4;
1089        dprintk("RPC:       reading %s XID %08x\n",
1090                        (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1091                                                              : "request with",
1092                        ntohl(transport->tcp_xid));
1093        xs_tcp_check_fraghdr(transport);
1094}
1095
1096static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1097                                       struct xdr_skb_reader *desc)
1098{
1099        size_t len, used;
1100        u32 offset;
1101        char *p;
1102
1103        /*
1104         * We want transport->tcp_offset to be 8 at the end of this routine
1105         * (4 bytes for the xid and 4 bytes for the call/reply flag).
1106         * When this function is called for the first time,
1107         * transport->tcp_offset is 4 (after having already read the xid).
1108         */
1109        offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1110        len = sizeof(transport->tcp_calldir) - offset;
1111        dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1112        p = ((char *) &transport->tcp_calldir) + offset;
1113        used = xdr_skb_read_bits(desc, p, len);
1114        transport->tcp_offset += used;
1115        if (used != len)
1116                return;
1117        transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1118        /*
1119         * We don't yet have the XDR buffer, so we will write the calldir
1120         * out after we get the buffer from the 'struct rpc_rqst'
1121         */
1122        switch (ntohl(transport->tcp_calldir)) {
1123        case RPC_REPLY:
1124                transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1125                transport->tcp_flags |= TCP_RCV_COPY_DATA;
1126                transport->tcp_flags |= TCP_RPC_REPLY;
1127                break;
1128        case RPC_CALL:
1129                transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1130                transport->tcp_flags |= TCP_RCV_COPY_DATA;
1131                transport->tcp_flags &= ~TCP_RPC_REPLY;
1132                break;
1133        default:
1134                dprintk("RPC:       invalid request message type\n");
1135                xprt_force_disconnect(&transport->xprt);
1136        }
1137        xs_tcp_check_fraghdr(transport);
1138}
1139
1140static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1141                                     struct xdr_skb_reader *desc,
1142                                     struct rpc_rqst *req)
1143{
1144        struct sock_xprt *transport =
1145                                container_of(xprt, struct sock_xprt, xprt);
1146        struct xdr_buf *rcvbuf;
1147        size_t len;
1148        ssize_t r;
1149
1150        rcvbuf = &req->rq_private_buf;
1151
1152        if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1153                /*
1154                 * Save the RPC direction in the XDR buffer
1155                 */
1156                memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1157                        &transport->tcp_calldir,
1158                        sizeof(transport->tcp_calldir));
1159                transport->tcp_copied += sizeof(transport->tcp_calldir);
1160                transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1161        }
1162
1163        len = desc->count;
1164        if (len > transport->tcp_reclen - transport->tcp_offset) {
1165                struct xdr_skb_reader my_desc;
1166
1167                len = transport->tcp_reclen - transport->tcp_offset;
1168                memcpy(&my_desc, desc, sizeof(my_desc));
1169                my_desc.count = len;
1170                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1171                                          &my_desc, xdr_skb_read_bits);
1172                desc->count -= r;
1173                desc->offset += r;
1174        } else
1175                r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1176                                          desc, xdr_skb_read_bits);
1177
1178        if (r > 0) {
1179                transport->tcp_copied += r;
1180                transport->tcp_offset += r;
1181        }
1182        if (r != len) {
1183                /* Error when copying to the receive buffer,
1184                 * usually because we weren't able to allocate
1185                 * additional buffer pages. All we can do now
1186                 * is turn off TCP_RCV_COPY_DATA, so the request
1187                 * will not receive any additional updates,
1188                 * and time out.
1189                 * Any remaining data from this record will
1190                 * be discarded.
1191                 */
1192                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1193                dprintk("RPC:       XID %08x truncated request\n",
1194                                ntohl(transport->tcp_xid));
1195                dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1196                                "tcp_offset = %u, tcp_reclen = %u\n",
1197                                xprt, transport->tcp_copied,
1198                                transport->tcp_offset, transport->tcp_reclen);
1199                return;
1200        }
1201
1202        dprintk("RPC:       XID %08x read %Zd bytes\n",
1203                        ntohl(transport->tcp_xid), r);
1204        dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1205                        "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1206                        transport->tcp_offset, transport->tcp_reclen);
1207
1208        if (transport->tcp_copied == req->rq_private_buf.buflen)
1209                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1210        else if (transport->tcp_offset == transport->tcp_reclen) {
1211                if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1212                        transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1213        }
1214}
1215
1216/*
1217 * Finds the request corresponding to the RPC xid and invokes the common
1218 * tcp read code to read the data.
1219 */
1220static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1221                                    struct xdr_skb_reader *desc)
1222{
1223        struct sock_xprt *transport =
1224                                container_of(xprt, struct sock_xprt, xprt);
1225        struct rpc_rqst *req;
1226
1227        dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1228
1229        /* Find and lock the request corresponding to this xid */
1230        spin_lock(&xprt->transport_lock);
1231        req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1232        if (!req) {
1233                dprintk("RPC:       XID %08x request not found!\n",
1234                                ntohl(transport->tcp_xid));
1235                spin_unlock(&xprt->transport_lock);
1236                return -1;
1237        }
1238
1239        xs_tcp_read_common(xprt, desc, req);
1240
1241        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1242                xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1243
1244        spin_unlock(&xprt->transport_lock);
1245        return 0;
1246}
1247
1248#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1249/*
1250 * Obtains an rpc_rqst previously allocated and invokes the common
1251 * tcp read code to read the data.  The result is placed in the callback
1252 * queue.
1253 * If we're unable to obtain the rpc_rqst we schedule the closing of the
1254 * connection and return -1.
1255 */
1256static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1257                                       struct xdr_skb_reader *desc)
1258{
1259        struct sock_xprt *transport =
1260                                container_of(xprt, struct sock_xprt, xprt);
1261        struct rpc_rqst *req;
1262
1263        req = xprt_alloc_bc_request(xprt);
1264        if (req == NULL) {
1265                printk(KERN_WARNING "Callback slot table overflowed\n");
1266                xprt_force_disconnect(xprt);
1267                return -1;
1268        }
1269
1270        req->rq_xid = transport->tcp_xid;
1271        dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1272        xs_tcp_read_common(xprt, desc, req);
1273
1274        if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1275                struct svc_serv *bc_serv = xprt->bc_serv;
1276
1277                /*
1278                 * Add callback request to callback list.  The callback
1279                 * service sleeps on the sv_cb_waitq waiting for new
1280                 * requests.  Wake it up after adding enqueing the
1281                 * request.
1282                 */
1283                dprintk("RPC:       add callback request to list\n");
1284                spin_lock(&bc_serv->sv_cb_lock);
1285                list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1286                spin_unlock(&bc_serv->sv_cb_lock);
1287                wake_up(&bc_serv->sv_cb_waitq);
1288        }
1289
1290        req->rq_private_buf.len = transport->tcp_copied;
1291
1292        return 0;
1293}
1294
1295static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1296                                        struct xdr_skb_reader *desc)
1297{
1298        struct sock_xprt *transport =
1299                                container_of(xprt, struct sock_xprt, xprt);
1300
1301        return (transport->tcp_flags & TCP_RPC_REPLY) ?
1302                xs_tcp_read_reply(xprt, desc) :
1303                xs_tcp_read_callback(xprt, desc);
1304}
1305#else
1306static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1307                                        struct xdr_skb_reader *desc)
1308{
1309        return xs_tcp_read_reply(xprt, desc);
1310}
1311#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1312
1313/*
1314 * Read data off the transport.  This can be either an RPC_CALL or an
1315 * RPC_REPLY.  Relay the processing to helper functions.
1316 */
1317static void xs_tcp_read_data(struct rpc_xprt *xprt,
1318                                    struct xdr_skb_reader *desc)
1319{
1320        struct sock_xprt *transport =
1321                                container_of(xprt, struct sock_xprt, xprt);
1322
1323        if (_xs_tcp_read_data(xprt, desc) == 0)
1324                xs_tcp_check_fraghdr(transport);
1325        else {
1326                /*
1327                 * The transport_lock protects the request handling.
1328                 * There's no need to hold it to update the tcp_flags.
1329                 */
1330                transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1331        }
1332}
1333
1334static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1335{
1336        size_t len;
1337
1338        len = transport->tcp_reclen - transport->tcp_offset;
1339        if (len > desc->count)
1340                len = desc->count;
1341        desc->count -= len;
1342        desc->offset += len;
1343        transport->tcp_offset += len;
1344        dprintk("RPC:       discarded %Zu bytes\n", len);
1345        xs_tcp_check_fraghdr(transport);
1346}
1347
1348static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1349{
1350        struct rpc_xprt *xprt = rd_desc->arg.data;
1351        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1352        struct xdr_skb_reader desc = {
1353                .skb    = skb,
1354                .offset = offset,
1355                .count  = len,
1356        };
1357
1358        dprintk("RPC:       xs_tcp_data_recv started\n");
1359        do {
1360                /* Read in a new fragment marker if necessary */
1361                /* Can we ever really expect to get completely empty fragments? */
1362                if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1363                        xs_tcp_read_fraghdr(xprt, &desc);
1364                        continue;
1365                }
1366                /* Read in the xid if necessary */
1367                if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1368                        xs_tcp_read_xid(transport, &desc);
1369                        continue;
1370                }
1371                /* Read in the call/reply flag */
1372                if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1373                        xs_tcp_read_calldir(transport, &desc);
1374                        continue;
1375                }
1376                /* Read in the request data */
1377                if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1378                        xs_tcp_read_data(xprt, &desc);
1379                        continue;
1380                }
1381                /* Skip over any trailing bytes on short reads */
1382                xs_tcp_read_discard(transport, &desc);
1383        } while (desc.count);
1384        dprintk("RPC:       xs_tcp_data_recv done\n");
1385        return len - desc.count;
1386}
1387
1388/**
1389 * xs_tcp_data_ready - "data ready" callback for TCP sockets
1390 * @sk: socket with data to read
1391 * @bytes: how much data to read
1392 *
1393 */
1394static void xs_tcp_data_ready(struct sock *sk, int bytes)
1395{
1396        struct rpc_xprt *xprt;
1397        read_descriptor_t rd_desc;
1398        int read;
1399
1400        dprintk("RPC:       xs_tcp_data_ready...\n");
1401
1402        read_lock_bh(&sk->sk_callback_lock);
1403        if (!(xprt = xprt_from_sock(sk)))
1404                goto out;
1405        if (xprt->shutdown)
1406                goto out;
1407
1408        /* Any data means we had a useful conversation, so
1409         * the we don't need to delay the next reconnect
1410         */
1411        if (xprt->reestablish_timeout)
1412                xprt->reestablish_timeout = 0;
1413
1414        /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1415        rd_desc.arg.data = xprt;
1416        do {
1417                rd_desc.count = 65536;
1418                read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1419        } while (read > 0);
1420out:
1421        read_unlock_bh(&sk->sk_callback_lock);
1422}
1423
1424/*
1425 * Do the equivalent of linger/linger2 handling for dealing with
1426 * broken servers that don't close the socket in a timely
1427 * fashion
1428 */
1429static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1430                unsigned long timeout)
1431{
1432        struct sock_xprt *transport;
1433
1434        if (xprt_test_and_set_connecting(xprt))
1435                return;
1436        set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1437        transport = container_of(xprt, struct sock_xprt, xprt);
1438        queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1439                           timeout);
1440}
1441
1442static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1443{
1444        struct sock_xprt *transport;
1445
1446        transport = container_of(xprt, struct sock_xprt, xprt);
1447
1448        if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1449            !cancel_delayed_work(&transport->connect_worker))
1450                return;
1451        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1452        xprt_clear_connecting(xprt);
1453}
1454
1455static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1456{
1457        smp_mb__before_clear_bit();
1458        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1459        clear_bit(XPRT_CLOSING, &xprt->state);
1460        smp_mb__after_clear_bit();
1461        /* Mark transport as closed and wake up all pending tasks */
1462        xprt_disconnect_done(xprt);
1463}
1464
1465/**
1466 * xs_tcp_state_change - callback to handle TCP socket state changes
1467 * @sk: socket whose state has changed
1468 *
1469 */
1470static void xs_tcp_state_change(struct sock *sk)
1471{
1472        struct rpc_xprt *xprt;
1473
1474        read_lock_bh(&sk->sk_callback_lock);
1475        if (!(xprt = xprt_from_sock(sk)))
1476                goto out;
1477        dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1478        dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
1479                        sk->sk_state, xprt_connected(xprt),
1480                        sock_flag(sk, SOCK_DEAD),
1481                        sock_flag(sk, SOCK_ZAPPED),
1482                        sk->sk_shutdown);
1483
1484        switch (sk->sk_state) {
1485        case TCP_ESTABLISHED:
1486                spin_lock(&xprt->transport_lock);
1487                if (!xprt_test_and_set_connected(xprt)) {
1488                        struct sock_xprt *transport = container_of(xprt,
1489                                        struct sock_xprt, xprt);
1490
1491                        /* Reset TCP record info */
1492                        transport->tcp_offset = 0;
1493                        transport->tcp_reclen = 0;
1494                        transport->tcp_copied = 0;
1495                        transport->tcp_flags =
1496                                TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1497
1498                        xprt_wake_pending_tasks(xprt, -EAGAIN);
1499                }
1500                spin_unlock(&xprt->transport_lock);
1501                break;
1502        case TCP_FIN_WAIT1:
1503                /* The client initiated a shutdown of the socket */
1504                xprt->connect_cookie++;
1505                xprt->reestablish_timeout = 0;
1506                set_bit(XPRT_CLOSING, &xprt->state);
1507                smp_mb__before_clear_bit();
1508                clear_bit(XPRT_CONNECTED, &xprt->state);
1509                clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1510                smp_mb__after_clear_bit();
1511                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1512                break;
1513        case TCP_CLOSE_WAIT:
1514                /* The server initiated a shutdown of the socket */
1515                xprt_force_disconnect(xprt);
1516                xprt->connect_cookie++;
1517        case TCP_CLOSING:
1518                /*
1519                 * If the server closed down the connection, make sure that
1520                 * we back off before reconnecting
1521                 */
1522                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1523                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1524                break;
1525        case TCP_LAST_ACK:
1526                set_bit(XPRT_CLOSING, &xprt->state);
1527                xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1528                smp_mb__before_clear_bit();
1529                clear_bit(XPRT_CONNECTED, &xprt->state);
1530                smp_mb__after_clear_bit();
1531                break;
1532        case TCP_CLOSE:
1533                xs_tcp_cancel_linger_timeout(xprt);
1534                xs_sock_mark_closed(xprt);
1535        }
1536 out:
1537        read_unlock_bh(&sk->sk_callback_lock);
1538}
1539
1540/**
1541 * xs_error_report - callback mainly for catching socket errors
1542 * @sk: socket
1543 */
1544static void xs_error_report(struct sock *sk)
1545{
1546        struct rpc_xprt *xprt;
1547
1548        read_lock_bh(&sk->sk_callback_lock);
1549        if (!(xprt = xprt_from_sock(sk)))
1550                goto out;
1551        dprintk("RPC:       %s client %p...\n"
1552                        "RPC:       error %d\n",
1553                        __func__, xprt, sk->sk_err);
1554        xprt_wake_pending_tasks(xprt, -EAGAIN);
1555out:
1556        read_unlock_bh(&sk->sk_callback_lock);
1557}
1558
1559static void xs_write_space(struct sock *sk)
1560{
1561        struct socket *sock;
1562        struct rpc_xprt *xprt;
1563
1564        if (unlikely(!(sock = sk->sk_socket)))
1565                return;
1566        clear_bit(SOCK_NOSPACE, &sock->flags);
1567
1568        if (unlikely(!(xprt = xprt_from_sock(sk))))
1569                return;
1570        if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1571                return;
1572
1573        xprt_write_space(xprt);
1574}
1575
1576/**
1577 * xs_udp_write_space - callback invoked when socket buffer space
1578 *                             becomes available
1579 * @sk: socket whose state has changed
1580 *
1581 * Called when more output buffer space is available for this socket.
1582 * We try not to wake our writers until they can make "significant"
1583 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1584 * with a bunch of small requests.
1585 */
1586static void xs_udp_write_space(struct sock *sk)
1587{
1588        read_lock_bh(&sk->sk_callback_lock);
1589
1590        /* from net/core/sock.c:sock_def_write_space */
1591        if (sock_writeable(sk))
1592                xs_write_space(sk);
1593
1594        read_unlock_bh(&sk->sk_callback_lock);
1595}
1596
1597/**
1598 * xs_tcp_write_space - callback invoked when socket buffer space
1599 *                             becomes available
1600 * @sk: socket whose state has changed
1601 *
1602 * Called when more output buffer space is available for this socket.
1603 * We try not to wake our writers until they can make "significant"
1604 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1605 * with a bunch of small requests.
1606 */
1607static void xs_tcp_write_space(struct sock *sk)
1608{
1609        read_lock_bh(&sk->sk_callback_lock);
1610
1611        /* from net/core/stream.c:sk_stream_write_space */
1612        if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1613                xs_write_space(sk);
1614
1615        read_unlock_bh(&sk->sk_callback_lock);
1616}
1617
1618static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1619{
1620        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1621        struct sock *sk = transport->inet;
1622
1623        if (transport->rcvsize) {
1624                sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1625                sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1626        }
1627        if (transport->sndsize) {
1628                sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1629                sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1630                sk->sk_write_space(sk);
1631        }
1632}
1633
1634/**
1635 * xs_udp_set_buffer_size - set send and receive limits
1636 * @xprt: generic transport
1637 * @sndsize: requested size of send buffer, in bytes
1638 * @rcvsize: requested size of receive buffer, in bytes
1639 *
1640 * Set socket send and receive buffer size limits.
1641 */
1642static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1643{
1644        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1645
1646        transport->sndsize = 0;
1647        if (sndsize)
1648                transport->sndsize = sndsize + 1024;
1649        transport->rcvsize = 0;
1650        if (rcvsize)
1651                transport->rcvsize = rcvsize + 1024;
1652
1653        xs_udp_do_set_buffer_size(xprt);
1654}
1655
1656/**
1657 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1658 * @task: task that timed out
1659 *
1660 * Adjust the congestion window after a retransmit timeout has occurred.
1661 */
1662static void xs_udp_timer(struct rpc_task *task)
1663{
1664        xprt_adjust_cwnd(task, -ETIMEDOUT);
1665}
1666
1667static unsigned short xs_get_random_port(void)
1668{
1669        unsigned short range = xprt_max_resvport - xprt_min_resvport;
1670        unsigned short rand = (unsigned short) net_random() % range;
1671        return rand + xprt_min_resvport;
1672}
1673
1674/**
1675 * xs_set_port - reset the port number in the remote endpoint address
1676 * @xprt: generic transport
1677 * @port: new port number
1678 *
1679 */
1680static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1681{
1682        dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1683
1684        rpc_set_port(xs_addr(xprt), port);
1685        xs_update_peer_port(xprt);
1686}
1687
1688static unsigned short xs_get_srcport(struct sock_xprt *transport)
1689{
1690        unsigned short port = transport->srcport;
1691
1692        if (port == 0 && transport->xprt.resvport)
1693                port = xs_get_random_port();
1694        return port;
1695}
1696
1697static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1698{
1699        if (transport->srcport != 0)
1700                transport->srcport = 0;
1701        if (!transport->xprt.resvport)
1702                return 0;
1703        if (port <= xprt_min_resvport || port > xprt_max_resvport)
1704                return xprt_max_resvport;
1705        return --port;
1706}
1707static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1708{
1709        struct sockaddr_storage myaddr;
1710        int err, nloop = 0;
1711        unsigned short port = xs_get_srcport(transport);
1712        unsigned short last;
1713
1714        memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1715        do {
1716                rpc_set_port((struct sockaddr *)&myaddr, port);
1717                err = kernel_bind(sock, (struct sockaddr *)&myaddr,
1718                                transport->xprt.addrlen);
1719                if (port == 0)
1720                        break;
1721                if (err == 0) {
1722                        transport->srcport = port;
1723                        break;
1724                }
1725                last = port;
1726                port = xs_next_srcport(transport, port);
1727                if (port > last)
1728                        nloop++;
1729        } while (err == -EADDRINUSE && nloop != 2);
1730
1731        if (myaddr.ss_family == AF_INET)
1732                dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
1733                                &((struct sockaddr_in *)&myaddr)->sin_addr,
1734                                port, err ? "failed" : "ok", err);
1735        else
1736                dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
1737                                &((struct sockaddr_in6 *)&myaddr)->sin6_addr,
1738                                port, err ? "failed" : "ok", err);
1739        return err;
1740}
1741
1742/*
1743 * We don't support autobind on AF_LOCAL sockets
1744 */
1745static void xs_local_rpcbind(struct rpc_task *task)
1746{
1747        xprt_set_bound(task->tk_xprt);
1748}
1749
1750static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
1751{
1752}
1753
1754#ifdef CONFIG_DEBUG_LOCK_ALLOC
1755static struct lock_class_key xs_key[2];
1756static struct lock_class_key xs_slock_key[2];
1757
1758static inline void xs_reclassify_socketu(struct socket *sock)
1759{
1760        struct sock *sk = sock->sk;
1761
1762        BUG_ON(sock_owned_by_user(sk));
1763        sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
1764                &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]);
1765}
1766
1767static inline void xs_reclassify_socket4(struct socket *sock)
1768{
1769        struct sock *sk = sock->sk;
1770
1771        BUG_ON(sock_owned_by_user(sk));
1772        sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1773                &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1774}
1775
1776static inline void xs_reclassify_socket6(struct socket *sock)
1777{
1778        struct sock *sk = sock->sk;
1779
1780        BUG_ON(sock_owned_by_user(sk));
1781        sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1782                &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1783}
1784
1785static inline void xs_reclassify_socket(int family, struct socket *sock)
1786{
1787        switch (family) {
1788        case AF_LOCAL:
1789                xs_reclassify_socketu(sock);
1790                break;
1791        case AF_INET:
1792                xs_reclassify_socket4(sock);
1793                break;
1794        case AF_INET6:
1795                xs_reclassify_socket6(sock);
1796                break;
1797        }
1798}
1799#else
1800static inline void xs_reclassify_socketu(struct socket *sock)
1801{
1802}
1803
1804static inline void xs_reclassify_socket4(struct socket *sock)
1805{
1806}
1807
1808static inline void xs_reclassify_socket6(struct socket *sock)
1809{
1810}
1811
1812static inline void xs_reclassify_socket(int family, struct socket *sock)
1813{
1814}
1815#endif
1816
1817static struct socket *xs_create_sock(struct rpc_xprt *xprt,
1818                struct sock_xprt *transport, int family, int type, int protocol)
1819{
1820        struct socket *sock;
1821        int err;
1822
1823        err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1);
1824        if (err < 0) {
1825                dprintk("RPC:       can't create %d transport socket (%d).\n",
1826                                protocol, -err);
1827                goto out;
1828        }
1829        xs_reclassify_socket(family, sock);
1830
1831        err = xs_bind(transport, sock);
1832        if (err) {
1833                sock_release(sock);
1834                goto out;
1835        }
1836
1837        return sock;
1838out:
1839        return ERR_PTR(err);
1840}
1841
1842static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1843                                      struct socket *sock)
1844{
1845        struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1846                                                                        xprt);
1847
1848        if (!transport->inet) {
1849                struct sock *sk = sock->sk;
1850
1851                write_lock_bh(&sk->sk_callback_lock);
1852
1853                xs_save_old_callbacks(transport, sk);
1854
1855                sk->sk_user_data = xprt;
1856                sk->sk_data_ready = xs_local_data_ready;
1857                sk->sk_write_space = xs_udp_write_space;
1858                sk->sk_error_report = xs_error_report;
1859                sk->sk_allocation = GFP_ATOMIC;
1860
1861                xprt_clear_connected(xprt);
1862
1863                /* Reset to new socket */
1864                transport->sock = sock;
1865                transport->inet = sk;
1866
1867                write_unlock_bh(&sk->sk_callback_lock);
1868        }
1869
1870        /* Tell the socket layer to start connecting... */
1871        xprt->stat.connect_count++;
1872        xprt->stat.connect_start = jiffies;
1873        return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1874}
1875
1876/**
1877 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
1878 * @xprt: RPC transport to connect
1879 * @transport: socket transport to connect
1880 * @create_sock: function to create a socket of the correct type
1881 *
1882 * Invoked by a work queue tasklet.
1883 */
1884static void xs_local_setup_socket(struct work_struct *work)
1885{
1886        struct sock_xprt *transport =
1887                container_of(work, struct sock_xprt, connect_worker.work);
1888        struct rpc_xprt *xprt = &transport->xprt;
1889        struct socket *sock;
1890        int status = -EIO;
1891
1892        if (xprt->shutdown)
1893                goto out;
1894
1895        current->flags |= PF_FSTRANS;
1896
1897        clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1898        status = __sock_create(xprt->xprt_net, AF_LOCAL,
1899                                        SOCK_STREAM, 0, &sock, 1);
1900        if (status < 0) {
1901                dprintk("RPC:       can't create AF_LOCAL "
1902                        "transport socket (%d).\n", -status);
1903                goto out;
1904        }
1905        xs_reclassify_socketu(sock);
1906
1907        dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
1908                        xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1909
1910        status = xs_local_finish_connecting(xprt, sock);
1911        switch (status) {
1912        case 0:
1913                dprintk("RPC:       xprt %p connected to %s\n",
1914                                xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1915                xprt_set_connected(xprt);
1916                break;
1917        case -ENOENT:
1918                dprintk("RPC:       xprt %p: socket %s does not exist\n",
1919                                xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1920                break;
1921        default:
1922                printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
1923                                __func__, -status,
1924                                xprt->address_strings[RPC_DISPLAY_ADDR]);
1925        }
1926
1927out:
1928        xprt_clear_connecting(xprt);
1929        xprt_wake_pending_tasks(xprt, status);
1930        current->flags &= ~PF_FSTRANS;
1931}
1932
1933#ifdef CONFIG_SUNRPC_SWAP
1934static void xs_set_memalloc(struct rpc_xprt *xprt)
1935{
1936        struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1937                        xprt);
1938
1939        if (xprt->swapper)
1940                sk_set_memalloc(transport->inet);
1941}
1942
1943/**
1944 * xs_swapper - Tag this transport as being used for swap.
1945 * @xprt: transport to tag
1946 * @enable: enable/disable
1947 *
1948 */
1949int xs_swapper(struct rpc_xprt *xprt, int enable)
1950{
1951        struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1952                        xprt);
1953        int err = 0;
1954
1955        if (enable) {
1956                xprt->swapper++;
1957                xs_set_memalloc(xprt);
1958        } else if (xprt->swapper) {
1959                xprt->swapper--;
1960                sk_clear_memalloc(transport->inet);
1961        }
1962
1963        return err;
1964}
1965EXPORT_SYMBOL_GPL(xs_swapper);
1966#else
1967static void xs_set_memalloc(struct rpc_xprt *xprt)
1968{
1969}
1970#endif
1971
1972static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1973{
1974        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1975
1976        if (!transport->inet) {
1977                struct sock *sk = sock->sk;
1978
1979                write_lock_bh(&sk->sk_callback_lock);
1980
1981                xs_save_old_callbacks(transport, sk);
1982
1983                sk->sk_user_data = xprt;
1984                sk->sk_data_ready = xs_udp_data_ready;
1985                sk->sk_write_space = xs_udp_write_space;
1986                sk->sk_error_report = xs_error_report;
1987                sk->sk_no_check = UDP_CSUM_NORCV;
1988                sk->sk_allocation = GFP_ATOMIC;
1989
1990                xprt_set_connected(xprt);
1991
1992                /* Reset to new socket */
1993                transport->sock = sock;
1994                transport->inet = sk;
1995
1996                xs_set_memalloc(xprt);
1997
1998                write_unlock_bh(&sk->sk_callback_lock);
1999        }
2000        xs_udp_do_set_buffer_size(xprt);
2001}
2002
2003static void xs_udp_setup_socket(struct work_struct *work)
2004{
2005        struct sock_xprt *transport =
2006                container_of(work, struct sock_xprt, connect_worker.work);
2007        struct rpc_xprt *xprt = &transport->xprt;
2008        struct socket *sock = transport->sock;
2009        int status = -EIO;
2010
2011        if (xprt->shutdown)
2012                goto out;
2013
2014        current->flags |= PF_FSTRANS;
2015
2016        /* Start by resetting any existing state */
2017        xs_reset_transport(transport);
2018        sock = xs_create_sock(xprt, transport,
2019                        xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
2020        if (IS_ERR(sock))
2021                goto out;
2022
2023        dprintk("RPC:       worker connecting xprt %p via %s to "
2024                                "%s (port %s)\n", xprt,
2025                        xprt->address_strings[RPC_DISPLAY_PROTO],
2026                        xprt->address_strings[RPC_DISPLAY_ADDR],
2027                        xprt->address_strings[RPC_DISPLAY_PORT]);
2028
2029        xs_udp_finish_connecting(xprt, sock);
2030        status = 0;
2031out:
2032        xprt_clear_connecting(xprt);
2033        xprt_wake_pending_tasks(xprt, status);
2034        current->flags &= ~PF_FSTRANS;
2035}
2036
2037/*
2038 * We need to preserve the port number so the reply cache on the server can
2039 * find our cached RPC replies when we get around to reconnecting.
2040 */
2041static void xs_abort_connection(struct sock_xprt *transport)
2042{
2043        int result;
2044        struct sockaddr any;
2045
2046        dprintk("RPC:       disconnecting xprt %p to reuse port\n", transport);
2047
2048        /*
2049         * Disconnect the transport socket by doing a connect operation
2050         * with AF_UNSPEC.  This should return immediately...
2051         */
2052        memset(&any, 0, sizeof(any));
2053        any.sa_family = AF_UNSPEC;
2054        result = kernel_connect(transport->sock, &any, sizeof(any), 0);
2055        if (!result)
2056                xs_sock_mark_closed(&transport->xprt);
2057        else
2058                dprintk("RPC:       AF_UNSPEC connect return code %d\n",
2059                                result);
2060}
2061
2062static void xs_tcp_reuse_connection(struct sock_xprt *transport)
2063{
2064        unsigned int state = transport->inet->sk_state;
2065
2066        if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) {
2067                /* we don't need to abort the connection if the socket
2068                 * hasn't undergone a shutdown
2069                 */
2070                if (transport->inet->sk_shutdown == 0)
2071                        return;
2072                dprintk("RPC:       %s: TCP_CLOSEd and sk_shutdown set to %d\n",
2073                                __func__, transport->inet->sk_shutdown);
2074        }
2075        if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) {
2076                /* we don't need to abort the connection if the socket
2077                 * hasn't undergone a shutdown
2078                 */
2079                if (transport->inet->sk_shutdown == 0)
2080                        return;
2081                dprintk("RPC:       %s: ESTABLISHED/SYN_SENT "
2082                                "sk_shutdown set to %d\n",
2083                                __func__, transport->inet->sk_shutdown);
2084        }
2085        xs_abort_connection(transport);
2086}
2087
2088static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2089{
2090        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2091        int ret = -ENOTCONN;
2092
2093        if (!transport->inet) {
2094                struct sock *sk = sock->sk;
2095
2096                write_lock_bh(&sk->sk_callback_lock);
2097
2098                xs_save_old_callbacks(transport, sk);
2099
2100                sk->sk_user_data = xprt;
2101                sk->sk_data_ready = xs_tcp_data_ready;
2102                sk->sk_state_change = xs_tcp_state_change;
2103                sk->sk_write_space = xs_tcp_write_space;
2104                sk->sk_error_report = xs_error_report;
2105                sk->sk_allocation = GFP_ATOMIC;
2106
2107                /* socket options */
2108                sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
2109                sock_reset_flag(sk, SOCK_LINGER);
2110                tcp_sk(sk)->linger2 = 0;
2111                tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
2112
2113                xprt_clear_connected(xprt);
2114
2115                /* Reset to new socket */
2116                transport->sock = sock;
2117                transport->inet = sk;
2118
2119                write_unlock_bh(&sk->sk_callback_lock);
2120        }
2121
2122        if (!xprt_bound(xprt))
2123                goto out;
2124
2125        xs_set_memalloc(xprt);
2126
2127        /* Tell the socket layer to start connecting... */
2128        xprt->stat.connect_count++;
2129        xprt->stat.connect_start = jiffies;
2130        ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
2131        switch (ret) {
2132        case 0:
2133        case -EINPROGRESS:
2134                /* SYN_SENT! */
2135                xprt->connect_cookie++;
2136                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2137                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2138        }
2139out:
2140        return ret;
2141}
2142
2143/**
2144 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
2145 * @xprt: RPC transport to connect
2146 * @transport: socket transport to connect
2147 * @create_sock: function to create a socket of the correct type
2148 *
2149 * Invoked by a work queue tasklet.
2150 */
2151static void xs_tcp_setup_socket(struct work_struct *work)
2152{
2153        struct sock_xprt *transport =
2154                container_of(work, struct sock_xprt, connect_worker.work);
2155        struct socket *sock = transport->sock;
2156        struct rpc_xprt *xprt = &transport->xprt;
2157        int status = -EIO;
2158
2159        if (xprt->shutdown)
2160                goto out;
2161
2162        current->flags |= PF_FSTRANS;
2163
2164        if (!sock) {
2165                clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
2166                sock = xs_create_sock(xprt, transport,
2167                                xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP);
2168                if (IS_ERR(sock)) {
2169                        status = PTR_ERR(sock);
2170                        goto out;
2171                }
2172        } else {
2173                int abort_and_exit;
2174
2175                abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
2176                                &xprt->state);
2177                /* "close" the socket, preserving the local port */
2178                xs_tcp_reuse_connection(transport);
2179
2180                if (abort_and_exit)
2181                        goto out_eagain;
2182        }
2183
2184        dprintk("RPC:       worker connecting xprt %p via %s to "
2185                                "%s (port %s)\n", xprt,
2186                        xprt->address_strings[RPC_DISPLAY_PROTO],
2187                        xprt->address_strings[RPC_DISPLAY_ADDR],
2188                        xprt->address_strings[RPC_DISPLAY_PORT]);
2189
2190        status = xs_tcp_finish_connecting(xprt, sock);
2191        dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
2192                        xprt, -status, xprt_connected(xprt),
2193                        sock->sk->sk_state);
2194        switch (status) {
2195        default:
2196                printk("%s: connect returned unhandled error %d\n",
2197                        __func__, status);
2198        case -EADDRNOTAVAIL:
2199                /* We're probably in TIME_WAIT. Get rid of existing socket,
2200                 * and retry
2201                 */
2202                set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
2203                xprt_force_disconnect(xprt);
2204                break;
2205        case -ECONNREFUSED:
2206        case -ECONNRESET:
2207        case -ENETUNREACH:
2208                /* retry with existing socket, after a delay */
2209        case 0:
2210        case -EINPROGRESS:
2211        case -EALREADY:
2212                xprt_clear_connecting(xprt);
2213                current->flags &= ~PF_FSTRANS;
2214                return;
2215        case -EINVAL:
2216                /* Happens, for instance, if the user specified a link
2217                 * local IPv6 address without a scope-id.
2218                 */
2219                goto out;
2220        }
2221out_eagain:
2222        status = -EAGAIN;
2223out:
2224        xprt_clear_connecting(xprt);
2225        xprt_wake_pending_tasks(xprt, status);
2226        current->flags &= ~PF_FSTRANS;
2227}
2228
2229/**
2230 * xs_connect - connect a socket to a remote endpoint
2231 * @task: address of RPC task that manages state of connect request
2232 *
2233 * TCP: If the remote end dropped the connection, delay reconnecting.
2234 *
2235 * UDP socket connects are synchronous, but we use a work queue anyway
2236 * to guarantee that even unprivileged user processes can set up a
2237 * socket on a privileged port.
2238 *
2239 * If a UDP socket connect fails, the delay behavior here prevents
2240 * retry floods (hard mounts).
2241 */
2242static void xs_connect(struct rpc_task *task)
2243{
2244        struct rpc_xprt *xprt = task->tk_xprt;
2245        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2246
2247        if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
2248                dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2249                                "seconds\n",
2250                                xprt, xprt->reestablish_timeout / HZ);
2251                queue_delayed_work(rpciod_workqueue,
2252                                   &transport->connect_worker,
2253                                   xprt->reestablish_timeout);
2254                xprt->reestablish_timeout <<= 1;
2255                if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2256                        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2257                if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2258                        xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2259        } else {
2260                dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2261                queue_delayed_work(rpciod_workqueue,
2262                                   &transport->connect_worker, 0);
2263        }
2264}
2265
2266/**
2267 * xs_local_print_stats - display AF_LOCAL socket-specifc stats
2268 * @xprt: rpc_xprt struct containing statistics
2269 * @seq: output file
2270 *
2271 */
2272static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2273{
2274        long idle_time = 0;
2275
2276        if (xprt_connected(xprt))
2277                idle_time = (long)(jiffies - xprt->last_used) / HZ;
2278
2279        seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
2280                        "%llu %llu %lu %llu %llu\n",
2281                        xprt->stat.bind_count,
2282                        xprt->stat.connect_count,
2283                        xprt->stat.connect_time,
2284                        idle_time,
2285                        xprt->stat.sends,
2286                        xprt->stat.recvs,
2287                        xprt->stat.bad_xids,
2288                        xprt->stat.req_u,
2289                        xprt->stat.bklog_u,
2290                        xprt->stat.max_slots,
2291                        xprt->stat.sending_u,
2292                        xprt->stat.pending_u);
2293}
2294
2295/**
2296 * xs_udp_print_stats - display UDP socket-specifc stats
2297 * @xprt: rpc_xprt struct containing statistics
2298 * @seq: output file
2299 *
2300 */
2301static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2302{
2303        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2304
2305        seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %llu %llu "
2306                        "%lu %llu %llu\n",
2307                        transport->srcport,
2308                        xprt->stat.bind_count,
2309                        xprt->stat.sends,
2310                        xprt->stat.recvs,
2311                        xprt->stat.bad_xids,
2312                        xprt->stat.req_u,
2313                        xprt->stat.bklog_u,
2314                        xprt->stat.max_slots,
2315                        xprt->stat.sending_u,
2316                        xprt->stat.pending_u);
2317}
2318
2319/**
2320 * xs_tcp_print_stats - display TCP socket-specifc stats
2321 * @xprt: rpc_xprt struct containing statistics
2322 * @seq: output file
2323 *
2324 */
2325static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2326{
2327        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2328        long idle_time = 0;
2329
2330        if (xprt_connected(xprt))
2331                idle_time = (long)(jiffies - xprt->last_used) / HZ;
2332
2333        seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu "
2334                        "%llu %llu %lu %llu %llu\n",
2335                        transport->srcport,
2336                        xprt->stat.bind_count,
2337                        xprt->stat.connect_count,
2338                        xprt->stat.connect_time,
2339                        idle_time,
2340                        xprt->stat.sends,
2341                        xprt->stat.recvs,
2342                        xprt->stat.bad_xids,
2343                        xprt->stat.req_u,
2344                        xprt->stat.bklog_u,
2345                        xprt->stat.max_slots,
2346                        xprt->stat.sending_u,
2347                        xprt->stat.pending_u);
2348}
2349
2350/*
2351 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2352 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2353 * to use the server side send routines.
2354 */
2355static void *bc_malloc(struct rpc_task *task, size_t size)
2356{
2357        struct page *page;
2358        struct rpc_buffer *buf;
2359
2360        BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2361        page = alloc_page(GFP_KERNEL);
2362
2363        if (!page)
2364                return NULL;
2365
2366        buf = page_address(page);
2367        buf->len = PAGE_SIZE;
2368
2369        return buf->data;
2370}
2371
2372/*
2373 * Free the space allocated in the bc_alloc routine
2374 */
2375static void bc_free(void *buffer)
2376{
2377        struct rpc_buffer *buf;
2378
2379        if (!buffer)
2380                return;
2381
2382        buf = container_of(buffer, struct rpc_buffer, data);
2383        free_page((unsigned long)buf);
2384}
2385
2386/*
2387 * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2388 * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2389 */
2390static int bc_sendto(struct rpc_rqst *req)
2391{
2392        int len;
2393        struct xdr_buf *xbufp = &req->rq_snd_buf;
2394        struct rpc_xprt *xprt = req->rq_xprt;
2395        struct sock_xprt *transport =
2396                                container_of(xprt, struct sock_xprt, xprt);
2397        struct socket *sock = transport->sock;
2398        unsigned long headoff;
2399        unsigned long tailoff;
2400
2401        xs_encode_stream_record_marker(xbufp);
2402
2403        tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2404        headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2405        len = svc_send_common(sock, xbufp,
2406                              virt_to_page(xbufp->head[0].iov_base), headoff,
2407                              xbufp->tail[0].iov_base, tailoff);
2408
2409        if (len != xbufp->len) {
2410                printk(KERN_NOTICE "Error sending entire callback!\n");
2411                len = -EAGAIN;
2412        }
2413
2414        return len;
2415}
2416
2417/*
2418 * The send routine. Borrows from svc_send
2419 */
2420static int bc_send_request(struct rpc_task *task)
2421{
2422        struct rpc_rqst *req = task->tk_rqstp;
2423        struct svc_xprt *xprt;
2424        struct svc_sock         *svsk;
2425        u32                     len;
2426
2427        dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2428        /*
2429         * Get the server socket associated with this callback xprt
2430         */
2431        xprt = req->rq_xprt->bc_xprt;
2432        svsk = container_of(xprt, struct svc_sock, sk_xprt);
2433
2434        /*
2435         * Grab the mutex to serialize data as the connection is shared
2436         * with the fore channel
2437         */
2438        if (!mutex_trylock(&xprt->xpt_mutex)) {
2439                rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2440                if (!mutex_trylock(&xprt->xpt_mutex))
2441                        return -EAGAIN;
2442                rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2443        }
2444        if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2445                len = -ENOTCONN;
2446        else
2447                len = bc_sendto(req);
2448        mutex_unlock(&xprt->xpt_mutex);
2449
2450        if (len > 0)
2451                len = 0;
2452
2453        return len;
2454}
2455
2456/*
2457 * The close routine. Since this is client initiated, we do nothing
2458 */
2459
2460static void bc_close(struct rpc_xprt *xprt)
2461{
2462}
2463
2464/*
2465 * The xprt destroy routine. Again, because this connection is client
2466 * initiated, we do nothing
2467 */
2468
2469static void bc_destroy(struct rpc_xprt *xprt)
2470{
2471}
2472
2473static struct rpc_xprt_ops xs_local_ops = {
2474        .reserve_xprt           = xprt_reserve_xprt,
2475        .release_xprt           = xs_tcp_release_xprt,
2476        .alloc_slot             = xprt_alloc_slot,
2477        .rpcbind                = xs_local_rpcbind,
2478        .set_port               = xs_local_set_port,
2479        .connect                = xs_connect,
2480        .buf_alloc              = rpc_malloc,
2481        .buf_free               = rpc_free,
2482        .send_request           = xs_local_send_request,
2483        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2484        .close                  = xs_close,
2485        .destroy                = xs_destroy,
2486        .print_stats            = xs_local_print_stats,
2487};
2488
2489static struct rpc_xprt_ops xs_udp_ops = {
2490        .set_buffer_size        = xs_udp_set_buffer_size,
2491        .reserve_xprt           = xprt_reserve_xprt_cong,
2492        .release_xprt           = xprt_release_xprt_cong,
2493        .alloc_slot             = xprt_alloc_slot,
2494        .rpcbind                = rpcb_getport_async,
2495        .set_port               = xs_set_port,
2496        .connect                = xs_connect,
2497        .buf_alloc              = rpc_malloc,
2498        .buf_free               = rpc_free,
2499        .send_request           = xs_udp_send_request,
2500        .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2501        .timer                  = xs_udp_timer,
2502        .release_request        = xprt_release_rqst_cong,
2503        .close                  = xs_close,
2504        .destroy                = xs_destroy,
2505        .print_stats            = xs_udp_print_stats,
2506};
2507
2508static struct rpc_xprt_ops xs_tcp_ops = {
2509        .reserve_xprt           = xprt_reserve_xprt,
2510        .release_xprt           = xs_tcp_release_xprt,
2511        .alloc_slot             = xprt_lock_and_alloc_slot,
2512        .rpcbind                = rpcb_getport_async,
2513        .set_port               = xs_set_port,
2514        .connect                = xs_connect,
2515        .buf_alloc              = rpc_malloc,
2516        .buf_free               = rpc_free,
2517        .send_request           = xs_tcp_send_request,
2518        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2519        .close                  = xs_tcp_close,
2520        .destroy                = xs_destroy,
2521        .print_stats            = xs_tcp_print_stats,
2522};
2523
2524/*
2525 * The rpc_xprt_ops for the server backchannel
2526 */
2527
2528static struct rpc_xprt_ops bc_tcp_ops = {
2529        .reserve_xprt           = xprt_reserve_xprt,
2530        .release_xprt           = xprt_release_xprt,
2531        .rpcbind                = xs_local_rpcbind,
2532        .buf_alloc              = bc_malloc,
2533        .buf_free               = bc_free,
2534        .send_request           = bc_send_request,
2535        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2536        .close                  = bc_close,
2537        .destroy                = bc_destroy,
2538        .print_stats            = xs_tcp_print_stats,
2539};
2540
2541static int xs_init_anyaddr(const int family, struct sockaddr *sap)
2542{
2543        static const struct sockaddr_in sin = {
2544                .sin_family             = AF_INET,
2545                .sin_addr.s_addr        = htonl(INADDR_ANY),
2546        };
2547        static const struct sockaddr_in6 sin6 = {
2548                .sin6_family            = AF_INET6,
2549                .sin6_addr              = IN6ADDR_ANY_INIT,
2550        };
2551
2552        switch (family) {
2553        case AF_LOCAL:
2554                break;
2555        case AF_INET:
2556                memcpy(sap, &sin, sizeof(sin));
2557                break;
2558        case AF_INET6:
2559                memcpy(sap, &sin6, sizeof(sin6));
2560                break;
2561        default:
2562                dprintk("RPC:       %s: Bad address family\n", __func__);
2563                return -EAFNOSUPPORT;
2564        }
2565        return 0;
2566}
2567
2568static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2569                                      unsigned int slot_table_size,
2570                                      unsigned int max_slot_table_size)
2571{
2572        struct rpc_xprt *xprt;
2573        struct sock_xprt *new;
2574
2575        if (args->addrlen > sizeof(xprt->addr)) {
2576                dprintk("RPC:       xs_setup_xprt: address too large\n");
2577                return ERR_PTR(-EBADF);
2578        }
2579
2580        xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
2581                        max_slot_table_size);
2582        if (xprt == NULL) {
2583                dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2584                                "rpc_xprt\n");
2585                return ERR_PTR(-ENOMEM);
2586        }
2587
2588        new = container_of(xprt, struct sock_xprt, xprt);
2589        memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2590        xprt->addrlen = args->addrlen;
2591        if (args->srcaddr)
2592                memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2593        else {
2594                int err;
2595                err = xs_init_anyaddr(args->dstaddr->sa_family,
2596                                        (struct sockaddr *)&new->srcaddr);
2597                if (err != 0) {
2598                        xprt_free(xprt);
2599                        return ERR_PTR(err);
2600                }
2601        }
2602
2603        return xprt;
2604}
2605
2606static const struct rpc_timeout xs_local_default_timeout = {
2607        .to_initval = 10 * HZ,
2608        .to_maxval = 10 * HZ,
2609        .to_retries = 2,
2610};
2611
2612/**
2613 * xs_setup_local - Set up transport to use an AF_LOCAL socket
2614 * @args: rpc transport creation arguments
2615 *
2616 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2617 */
2618static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2619{
2620        struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
2621        struct sock_xprt *transport;
2622        struct rpc_xprt *xprt;
2623        struct rpc_xprt *ret;
2624
2625        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2626                        xprt_max_tcp_slot_table_entries);
2627        if (IS_ERR(xprt))
2628                return xprt;
2629        transport = container_of(xprt, struct sock_xprt, xprt);
2630
2631        xprt->prot = 0;
2632        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2633        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2634
2635        xprt->bind_timeout = XS_BIND_TO;
2636        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2637        xprt->idle_timeout = XS_IDLE_DISC_TO;
2638
2639        xprt->ops = &xs_local_ops;
2640        xprt->timeout = &xs_local_default_timeout;
2641
2642        switch (sun->sun_family) {
2643        case AF_LOCAL:
2644                if (sun->sun_path[0] != '/') {
2645                        dprintk("RPC:       bad AF_LOCAL address: %s\n",
2646                                        sun->sun_path);
2647                        ret = ERR_PTR(-EINVAL);
2648                        goto out_err;
2649                }
2650                xprt_set_bound(xprt);
2651                INIT_DELAYED_WORK(&transport->connect_worker,
2652                                        xs_local_setup_socket);
2653                xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
2654                break;
2655        default:
2656                ret = ERR_PTR(-EAFNOSUPPORT);
2657                goto out_err;
2658        }
2659
2660        dprintk("RPC:       set up xprt to %s via AF_LOCAL\n",
2661                        xprt->address_strings[RPC_DISPLAY_ADDR]);
2662
2663        if (try_module_get(THIS_MODULE))
2664                return xprt;
2665        ret = ERR_PTR(-EINVAL);
2666out_err:
2667        xprt_free(xprt);
2668        return ret;
2669}
2670
2671static const struct rpc_timeout xs_udp_default_timeout = {
2672        .to_initval = 5 * HZ,
2673        .to_maxval = 30 * HZ,
2674        .to_increment = 5 * HZ,
2675        .to_retries = 5,
2676};
2677
2678/**
2679 * xs_setup_udp - Set up transport to use a UDP socket
2680 * @args: rpc transport creation arguments
2681 *
2682 */
2683static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2684{
2685        struct sockaddr *addr = args->dstaddr;
2686        struct rpc_xprt *xprt;
2687        struct sock_xprt *transport;
2688        struct rpc_xprt *ret;
2689
2690        xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
2691                        xprt_udp_slot_table_entries);
2692        if (IS_ERR(xprt))
2693                return xprt;
2694        transport = container_of(xprt, struct sock_xprt, xprt);
2695
2696        xprt->prot = IPPROTO_UDP;
2697        xprt->tsh_size = 0;
2698        /* XXX: header size can vary due to auth type, IPv6, etc. */
2699        xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2700
2701        xprt->bind_timeout = XS_BIND_TO;
2702        xprt->reestablish_timeout = XS_UDP_REEST_TO;
2703        xprt->idle_timeout = XS_IDLE_DISC_TO;
2704
2705        xprt->ops = &xs_udp_ops;
2706
2707        xprt->timeout = &xs_udp_default_timeout;
2708
2709        switch (addr->sa_family) {
2710        case AF_INET:
2711                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2712                        xprt_set_bound(xprt);
2713
2714                INIT_DELAYED_WORK(&transport->connect_worker,
2715                                        xs_udp_setup_socket);
2716                xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2717                break;
2718        case AF_INET6:
2719                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2720                        xprt_set_bound(xprt);
2721
2722                INIT_DELAYED_WORK(&transport->connect_worker,
2723                                        xs_udp_setup_socket);
2724                xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2725                break;
2726        default:
2727                ret = ERR_PTR(-EAFNOSUPPORT);
2728                goto out_err;
2729        }
2730
2731        if (xprt_bound(xprt))
2732                dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2733                                xprt->address_strings[RPC_DISPLAY_ADDR],
2734                                xprt->address_strings[RPC_DISPLAY_PORT],
2735                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2736        else
2737                dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2738                                xprt->address_strings[RPC_DISPLAY_ADDR],
2739                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2740
2741        if (try_module_get(THIS_MODULE))
2742                return xprt;
2743        ret = ERR_PTR(-EINVAL);
2744out_err:
2745        xprt_free(xprt);
2746        return ret;
2747}
2748
2749static const struct rpc_timeout xs_tcp_default_timeout = {
2750        .to_initval = 60 * HZ,
2751        .to_maxval = 60 * HZ,
2752        .to_retries = 2,
2753};
2754
2755/**
2756 * xs_setup_tcp - Set up transport to use a TCP socket
2757 * @args: rpc transport creation arguments
2758 *
2759 */
2760static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2761{
2762        struct sockaddr *addr = args->dstaddr;
2763        struct rpc_xprt *xprt;
2764        struct sock_xprt *transport;
2765        struct rpc_xprt *ret;
2766
2767        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2768                        xprt_max_tcp_slot_table_entries);
2769        if (IS_ERR(xprt))
2770                return xprt;
2771        transport = container_of(xprt, struct sock_xprt, xprt);
2772
2773        xprt->prot = IPPROTO_TCP;
2774        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2775        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2776
2777        xprt->bind_timeout = XS_BIND_TO;
2778        xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2779        xprt->idle_timeout = XS_IDLE_DISC_TO;
2780
2781        xprt->ops = &xs_tcp_ops;
2782        xprt->timeout = &xs_tcp_default_timeout;
2783
2784        switch (addr->sa_family) {
2785        case AF_INET:
2786                if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2787                        xprt_set_bound(xprt);
2788
2789                INIT_DELAYED_WORK(&transport->connect_worker,
2790                                        xs_tcp_setup_socket);
2791                xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2792                break;
2793        case AF_INET6:
2794                if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2795                        xprt_set_bound(xprt);
2796
2797                INIT_DELAYED_WORK(&transport->connect_worker,
2798                                        xs_tcp_setup_socket);
2799                xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2800                break;
2801        default:
2802                ret = ERR_PTR(-EAFNOSUPPORT);
2803                goto out_err;
2804        }
2805
2806        if (xprt_bound(xprt))
2807                dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2808                                xprt->address_strings[RPC_DISPLAY_ADDR],
2809                                xprt->address_strings[RPC_DISPLAY_PORT],
2810                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2811        else
2812                dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2813                                xprt->address_strings[RPC_DISPLAY_ADDR],
2814                                xprt->address_strings[RPC_DISPLAY_PROTO]);
2815
2816
2817        if (try_module_get(THIS_MODULE))
2818                return xprt;
2819        ret = ERR_PTR(-EINVAL);
2820out_err:
2821        xprt_free(xprt);
2822        return ret;
2823}
2824
2825/**
2826 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2827 * @args: rpc transport creation arguments
2828 *
2829 */
2830static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2831{
2832        struct sockaddr *addr = args->dstaddr;
2833        struct rpc_xprt *xprt;
2834        struct sock_xprt *transport;
2835        struct svc_sock *bc_sock;
2836        struct rpc_xprt *ret;
2837
2838        if (args->bc_xprt->xpt_bc_xprt) {
2839                /*
2840                 * This server connection already has a backchannel
2841                 * export; we can't create a new one, as we wouldn't be
2842                 * able to match replies based on xid any more.  So,
2843                 * reuse the already-existing one:
2844                 */
2845                 return args->bc_xprt->xpt_bc_xprt;
2846        }
2847        xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2848                        xprt_tcp_slot_table_entries);
2849        if (IS_ERR(xprt))
2850                return xprt;
2851        transport = container_of(xprt, struct sock_xprt, xprt);
2852
2853        xprt->prot = IPPROTO_TCP;
2854        xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2855        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2856        xprt->timeout = &xs_tcp_default_timeout;
2857
2858        /* backchannel */
2859        xprt_set_bound(xprt);
2860        xprt->bind_timeout = 0;
2861        xprt->reestablish_timeout = 0;
2862        xprt->idle_timeout = 0;
2863
2864        xprt->ops = &bc_tcp_ops;
2865
2866        switch (addr->sa_family) {
2867        case AF_INET:
2868                xs_format_peer_addresses(xprt, "tcp",
2869                                         RPCBIND_NETID_TCP);
2870                break;
2871        case AF_INET6:
2872                xs_format_peer_addresses(xprt, "tcp",
2873                                   RPCBIND_NETID_TCP6);
2874                break;
2875        default:
2876                ret = ERR_PTR(-EAFNOSUPPORT);
2877                goto out_err;
2878        }
2879
2880        dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2881                        xprt->address_strings[RPC_DISPLAY_ADDR],
2882                        xprt->address_strings[RPC_DISPLAY_PORT],
2883                        xprt->address_strings[RPC_DISPLAY_PROTO]);
2884
2885        /*
2886         * Once we've associated a backchannel xprt with a connection,
2887         * we want to keep it around as long as long as the connection
2888         * lasts, in case we need to start using it for a backchannel
2889         * again; this reference won't be dropped until bc_xprt is
2890         * destroyed.
2891         */
2892        xprt_get(xprt);
2893        args->bc_xprt->xpt_bc_xprt = xprt;
2894        xprt->bc_xprt = args->bc_xprt;
2895        bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2896        transport->sock = bc_sock->sk_sock;
2897        transport->inet = bc_sock->sk_sk;
2898
2899        /*
2900         * Since we don't want connections for the backchannel, we set
2901         * the xprt status to connected
2902         */
2903        xprt_set_connected(xprt);
2904
2905
2906        if (try_module_get(THIS_MODULE))
2907                return xprt;
2908        xprt_put(xprt);
2909        ret = ERR_PTR(-EINVAL);
2910out_err:
2911        xprt_free(xprt);
2912        return ret;
2913}
2914
2915static struct xprt_class        xs_local_transport = {
2916        .list           = LIST_HEAD_INIT(xs_local_transport.list),
2917        .name           = "named UNIX socket",
2918        .owner          = THIS_MODULE,
2919        .ident          = XPRT_TRANSPORT_LOCAL,
2920        .setup          = xs_setup_local,
2921};
2922
2923static struct xprt_class        xs_udp_transport = {
2924        .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2925        .name           = "udp",
2926        .owner          = THIS_MODULE,
2927        .ident          = XPRT_TRANSPORT_UDP,
2928        .setup          = xs_setup_udp,
2929};
2930
2931static struct xprt_class        xs_tcp_transport = {
2932        .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2933        .name           = "tcp",
2934        .owner          = THIS_MODULE,
2935        .ident          = XPRT_TRANSPORT_TCP,
2936        .setup          = xs_setup_tcp,
2937};
2938
2939static struct xprt_class        xs_bc_tcp_transport = {
2940        .list           = LIST_HEAD_INIT(xs_bc_tcp_transport.list),
2941        .name           = "tcp NFSv4.1 backchannel",
2942        .owner          = THIS_MODULE,
2943        .ident          = XPRT_TRANSPORT_BC_TCP,
2944        .setup          = xs_setup_bc_tcp,
2945};
2946
2947/**
2948 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2949 *
2950 */
2951int init_socket_xprt(void)
2952{
2953#ifdef RPC_DEBUG
2954        if (!sunrpc_table_header)
2955                sunrpc_table_header = register_sysctl_table(sunrpc_table);
2956#endif
2957
2958        xprt_register_transport(&xs_local_transport);
2959        xprt_register_transport(&xs_udp_transport);
2960        xprt_register_transport(&xs_tcp_transport);
2961        xprt_register_transport(&xs_bc_tcp_transport);
2962
2963        return 0;
2964}
2965
2966/**
2967 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2968 *
2969 */
2970void cleanup_socket_xprt(void)
2971{
2972#ifdef RPC_DEBUG
2973        if (sunrpc_table_header) {
2974                unregister_sysctl_table(sunrpc_table_header);
2975                sunrpc_table_header = NULL;
2976        }
2977#endif
2978
2979        xprt_unregister_transport(&xs_local_transport);
2980        xprt_unregister_transport(&xs_udp_transport);
2981        xprt_unregister_transport(&xs_tcp_transport);
2982        xprt_unregister_transport(&xs_bc_tcp_transport);
2983}
2984
2985static int param_set_uint_minmax(const char *val,
2986                const struct kernel_param *kp,
2987                unsigned int min, unsigned int max)
2988{
2989        unsigned long num;
2990        int ret;
2991
2992        if (!val)
2993                return -EINVAL;
2994        ret = strict_strtoul(val, 0, &num);
2995        if (ret == -EINVAL || num < min || num > max)
2996                return -EINVAL;
2997        *((unsigned int *)kp->arg) = num;
2998        return 0;
2999}
3000
3001static int param_set_portnr(const char *val, const struct kernel_param *kp)
3002{
3003        return param_set_uint_minmax(val, kp,
3004                        RPC_MIN_RESVPORT,
3005                        RPC_MAX_RESVPORT);
3006}
3007
3008static struct kernel_param_ops param_ops_portnr = {
3009        .set = param_set_portnr,
3010        .get = param_get_uint,
3011};
3012
3013#define param_check_portnr(name, p) \
3014        __param_check(name, p, unsigned int);
3015
3016module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
3017module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
3018
3019static int param_set_slot_table_size(const char *val,
3020                                     const struct kernel_param *kp)
3021{
3022        return param_set_uint_minmax(val, kp,
3023                        RPC_MIN_SLOT_TABLE,
3024                        RPC_MAX_SLOT_TABLE);
3025}
3026
3027static struct kernel_param_ops param_ops_slot_table_size = {
3028        .set = param_set_slot_table_size,
3029        .get = param_get_uint,
3030};
3031
3032#define param_check_slot_table_size(name, p) \
3033        __param_check(name, p, unsigned int);
3034
3035static int param_set_max_slot_table_size(const char *val,
3036                                     const struct kernel_param *kp)
3037{
3038        return param_set_uint_minmax(val, kp,
3039                        RPC_MIN_SLOT_TABLE,
3040                        RPC_MAX_SLOT_TABLE_LIMIT);
3041}
3042
3043static struct kernel_param_ops param_ops_max_slot_table_size = {
3044        .set = param_set_max_slot_table_size,
3045        .get = param_get_uint,
3046};
3047
3048#define param_check_max_slot_table_size(name, p) \
3049        __param_check(name, p, unsigned int);
3050
3051module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
3052                   slot_table_size, 0644);
3053module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
3054                   max_slot_table_size, 0644);
3055module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
3056                   slot_table_size, 0644);
3057
3058
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.