linux/net/sunrpc/svc_xprt.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/svc_xprt.c
   3 *
   4 * Author: Tom Tucker <tom@opengridcomputing.com>
   5 */
   6
   7#include <linux/sched.h>
   8#include <linux/errno.h>
   9#include <linux/freezer.h>
  10#include <linux/kthread.h>
  11#include <net/sock.h>
  12#include <linux/sunrpc/stats.h>
  13#include <linux/sunrpc/svc_xprt.h>
  14
  15#define RPCDBG_FACILITY RPCDBG_SVCXPRT
  16
  17static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
  18static int svc_deferred_recv(struct svc_rqst *rqstp);
  19static struct cache_deferred_req *svc_defer(struct cache_req *req);
  20static void svc_age_temp_xprts(unsigned long closure);
  21
  22/* apparently the "standard" is that clients close
  23 * idle connections after 5 minutes, servers after
  24 * 6 minutes
  25 *   http://www.connectathon.org/talks96/nfstcp.pdf
  26 */
  27static int svc_conn_age_period = 6*60;
  28
  29/* List of registered transport classes */
  30static DEFINE_SPINLOCK(svc_xprt_class_lock);
  31static LIST_HEAD(svc_xprt_class_list);
  32
  33/* SMP locking strategy:
  34 *
  35 *      svc_pool->sp_lock protects most of the fields of that pool.
  36 *      svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
  37 *      when both need to be taken (rare), svc_serv->sv_lock is first.
  38 *      BKL protects svc_serv->sv_nrthread.
  39 *      svc_sock->sk_lock protects the svc_sock->sk_deferred list
  40 *             and the ->sk_info_authunix cache.
  41 *
  42 *      The XPT_BUSY bit in xprt->xpt_flags prevents a transport being
  43 *      enqueued multiply. During normal transport processing this bit
  44 *      is set by svc_xprt_enqueue and cleared by svc_xprt_received.
  45 *      Providers should not manipulate this bit directly.
  46 *
  47 *      Some flags can be set to certain values at any time
  48 *      providing that certain rules are followed:
  49 *
  50 *      XPT_CONN, XPT_DATA:
  51 *              - Can be set or cleared at any time.
  52 *              - After a set, svc_xprt_enqueue must be called to enqueue
  53 *                the transport for processing.
  54 *              - After a clear, the transport must be read/accepted.
  55 *                If this succeeds, it must be set again.
  56 *      XPT_CLOSE:
  57 *              - Can set at any time. It is never cleared.
  58 *      XPT_DEAD:
  59 *              - Can only be set while XPT_BUSY is held which ensures
  60 *                that no other thread will be using the transport or will
  61 *                try to set XPT_DEAD.
  62 */
  63
  64int svc_reg_xprt_class(struct svc_xprt_class *xcl)
  65{
  66        struct svc_xprt_class *cl;
  67        int res = -EEXIST;
  68
  69        dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name);
  70
  71        INIT_LIST_HEAD(&xcl->xcl_list);
  72        spin_lock(&svc_xprt_class_lock);
  73        /* Make sure there isn't already a class with the same name */
  74        list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) {
  75                if (strcmp(xcl->xcl_name, cl->xcl_name) == 0)
  76                        goto out;
  77        }
  78        list_add_tail(&xcl->xcl_list, &svc_xprt_class_list);
  79        res = 0;
  80out:
  81        spin_unlock(&svc_xprt_class_lock);
  82        return res;
  83}
  84EXPORT_SYMBOL_GPL(svc_reg_xprt_class);
  85
  86void svc_unreg_xprt_class(struct svc_xprt_class *xcl)
  87{
  88        dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name);
  89        spin_lock(&svc_xprt_class_lock);
  90        list_del_init(&xcl->xcl_list);
  91        spin_unlock(&svc_xprt_class_lock);
  92}
  93EXPORT_SYMBOL_GPL(svc_unreg_xprt_class);
  94
  95/*
  96 * Format the transport list for printing
  97 */
  98int svc_print_xprts(char *buf, int maxlen)
  99{
 100        struct list_head *le;
 101        char tmpstr[80];
 102        int len = 0;
 103        buf[0] = '\0';
 104
 105        spin_lock(&svc_xprt_class_lock);
 106        list_for_each(le, &svc_xprt_class_list) {
 107                int slen;
 108                struct svc_xprt_class *xcl =
 109                        list_entry(le, struct svc_xprt_class, xcl_list);
 110
 111                sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload);
 112                slen = strlen(tmpstr);
 113                if (len + slen > maxlen)
 114                        break;
 115                len += slen;
 116                strcat(buf, tmpstr);
 117        }
 118        spin_unlock(&svc_xprt_class_lock);
 119
 120        return len;
 121}
 122
 123static void svc_xprt_free(struct kref *kref)
 124{
 125        struct svc_xprt *xprt =
 126                container_of(kref, struct svc_xprt, xpt_ref);
 127        struct module *owner = xprt->xpt_class->xcl_owner;
 128        if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)
 129            && xprt->xpt_auth_cache != NULL)
 130                svcauth_unix_info_release(xprt->xpt_auth_cache);
 131        xprt->xpt_ops->xpo_free(xprt);
 132        module_put(owner);
 133}
 134
 135void svc_xprt_put(struct svc_xprt *xprt)
 136{
 137        kref_put(&xprt->xpt_ref, svc_xprt_free);
 138}
 139EXPORT_SYMBOL_GPL(svc_xprt_put);
 140
 141/*
 142 * Called by transport drivers to initialize the transport independent
 143 * portion of the transport instance.
 144 */
 145void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt,
 146                   struct svc_serv *serv)
 147{
 148        memset(xprt, 0, sizeof(*xprt));
 149        xprt->xpt_class = xcl;
 150        xprt->xpt_ops = xcl->xcl_ops;
 151        kref_init(&xprt->xpt_ref);
 152        xprt->xpt_server = serv;
 153        INIT_LIST_HEAD(&xprt->xpt_list);
 154        INIT_LIST_HEAD(&xprt->xpt_ready);
 155        INIT_LIST_HEAD(&xprt->xpt_deferred);
 156        mutex_init(&xprt->xpt_mutex);
 157        spin_lock_init(&xprt->xpt_lock);
 158        set_bit(XPT_BUSY, &xprt->xpt_flags);
 159}
 160EXPORT_SYMBOL_GPL(svc_xprt_init);
 161
 162static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl,
 163                                         struct svc_serv *serv,
 164                                         unsigned short port, int flags)
 165{
 166        struct sockaddr_in sin = {
 167                .sin_family             = AF_INET,
 168                .sin_addr.s_addr        = htonl(INADDR_ANY),
 169                .sin_port               = htons(port),
 170        };
 171        struct sockaddr_in6 sin6 = {
 172                .sin6_family            = AF_INET6,
 173                .sin6_addr              = IN6ADDR_ANY_INIT,
 174                .sin6_port              = htons(port),
 175        };
 176        struct sockaddr *sap;
 177        size_t len;
 178
 179        switch (serv->sv_family) {
 180        case AF_INET:
 181                sap = (struct sockaddr *)&sin;
 182                len = sizeof(sin);
 183                break;
 184        case AF_INET6:
 185                sap = (struct sockaddr *)&sin6;
 186                len = sizeof(sin6);
 187                break;
 188        default:
 189                return ERR_PTR(-EAFNOSUPPORT);
 190        }
 191
 192        return xcl->xcl_ops->xpo_create(serv, sap, len, flags);
 193}
 194
 195int svc_create_xprt(struct svc_serv *serv, char *xprt_name, unsigned short port,
 196                    int flags)
 197{
 198        struct svc_xprt_class *xcl;
 199
 200        dprintk("svc: creating transport %s[%d]\n", xprt_name, port);
 201        spin_lock(&svc_xprt_class_lock);
 202        list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) {
 203                struct svc_xprt *newxprt;
 204
 205                if (strcmp(xprt_name, xcl->xcl_name))
 206                        continue;
 207
 208                if (!try_module_get(xcl->xcl_owner))
 209                        goto err;
 210
 211                spin_unlock(&svc_xprt_class_lock);
 212                newxprt = __svc_xpo_create(xcl, serv, port, flags);
 213                if (IS_ERR(newxprt)) {
 214                        module_put(xcl->xcl_owner);
 215                        return PTR_ERR(newxprt);
 216                }
 217
 218                clear_bit(XPT_TEMP, &newxprt->xpt_flags);
 219                spin_lock_bh(&serv->sv_lock);
 220                list_add(&newxprt->xpt_list, &serv->sv_permsocks);
 221                spin_unlock_bh(&serv->sv_lock);
 222                clear_bit(XPT_BUSY, &newxprt->xpt_flags);
 223                return svc_xprt_local_port(newxprt);
 224        }
 225 err:
 226        spin_unlock(&svc_xprt_class_lock);
 227        dprintk("svc: transport %s not found\n", xprt_name);
 228        return -ENOENT;
 229}
 230EXPORT_SYMBOL_GPL(svc_create_xprt);
 231
 232/*
 233 * Copy the local and remote xprt addresses to the rqstp structure
 234 */
 235void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt)
 236{
 237        struct sockaddr *sin;
 238
 239        memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen);
 240        rqstp->rq_addrlen = xprt->xpt_remotelen;
 241
 242        /*
 243         * Destination address in request is needed for binding the
 244         * source address in RPC replies/callbacks later.
 245         */
 246        sin = (struct sockaddr *)&xprt->xpt_local;
 247        switch (sin->sa_family) {
 248        case AF_INET:
 249                rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr;
 250                break;
 251        case AF_INET6:
 252                rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr;
 253                break;
 254        }
 255}
 256EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs);
 257
 258/**
 259 * svc_print_addr - Format rq_addr field for printing
 260 * @rqstp: svc_rqst struct containing address to print
 261 * @buf: target buffer for formatted address
 262 * @len: length of target buffer
 263 *
 264 */
 265char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
 266{
 267        return __svc_print_addr(svc_addr(rqstp), buf, len);
 268}
 269EXPORT_SYMBOL_GPL(svc_print_addr);
 270
 271/*
 272 * Queue up an idle server thread.  Must have pool->sp_lock held.
 273 * Note: this is really a stack rather than a queue, so that we only
 274 * use as many different threads as we need, and the rest don't pollute
 275 * the cache.
 276 */
 277static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
 278{
 279        list_add(&rqstp->rq_list, &pool->sp_threads);
 280}
 281
 282/*
 283 * Dequeue an nfsd thread.  Must have pool->sp_lock held.
 284 */
 285static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
 286{
 287        list_del(&rqstp->rq_list);
 288}
 289
 290/*
 291 * Queue up a transport with data pending. If there are idle nfsd
 292 * processes, wake 'em up.
 293 *
 294 */
 295void svc_xprt_enqueue(struct svc_xprt *xprt)
 296{
 297        struct svc_serv *serv = xprt->xpt_server;
 298        struct svc_pool *pool;
 299        struct svc_rqst *rqstp;
 300        int cpu;
 301
 302        if (!(xprt->xpt_flags &
 303              ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED))))
 304                return;
 305
 306        cpu = get_cpu();
 307        pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
 308        put_cpu();
 309
 310        spin_lock_bh(&pool->sp_lock);
 311
 312        if (!list_empty(&pool->sp_threads) &&
 313            !list_empty(&pool->sp_sockets))
 314                printk(KERN_ERR
 315                       "svc_xprt_enqueue: "
 316                       "threads and transports both waiting??\n");
 317
 318        if (test_bit(XPT_DEAD, &xprt->xpt_flags)) {
 319                /* Don't enqueue dead transports */
 320                dprintk("svc: transport %p is dead, not enqueued\n", xprt);
 321                goto out_unlock;
 322        }
 323
 324        /* Mark transport as busy. It will remain in this state until
 325         * the provider calls svc_xprt_received. We update XPT_BUSY
 326         * atomically because it also guards against trying to enqueue
 327         * the transport twice.
 328         */
 329        if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) {
 330                /* Don't enqueue transport while already enqueued */
 331                dprintk("svc: transport %p busy, not enqueued\n", xprt);
 332                goto out_unlock;
 333        }
 334        BUG_ON(xprt->xpt_pool != NULL);
 335        xprt->xpt_pool = pool;
 336
 337        /* Handle pending connection */
 338        if (test_bit(XPT_CONN, &xprt->xpt_flags))
 339                goto process;
 340
 341        /* Handle close in-progress */
 342        if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
 343                goto process;
 344
 345        /* Check if we have space to reply to a request */
 346        if (!xprt->xpt_ops->xpo_has_wspace(xprt)) {
 347                /* Don't enqueue while not enough space for reply */
 348                dprintk("svc: no write space, transport %p  not enqueued\n",
 349                        xprt);
 350                xprt->xpt_pool = NULL;
 351                clear_bit(XPT_BUSY, &xprt->xpt_flags);
 352                goto out_unlock;
 353        }
 354
 355 process:
 356        if (!list_empty(&pool->sp_threads)) {
 357                rqstp = list_entry(pool->sp_threads.next,
 358                                   struct svc_rqst,
 359                                   rq_list);
 360                dprintk("svc: transport %p served by daemon %p\n",
 361                        xprt, rqstp);
 362                svc_thread_dequeue(pool, rqstp);
 363                if (rqstp->rq_xprt)
 364                        printk(KERN_ERR
 365                                "svc_xprt_enqueue: server %p, rq_xprt=%p!\n",
 366                                rqstp, rqstp->rq_xprt);
 367                rqstp->rq_xprt = xprt;
 368                svc_xprt_get(xprt);
 369                rqstp->rq_reserved = serv->sv_max_mesg;
 370                atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
 371                BUG_ON(xprt->xpt_pool != pool);
 372                wake_up(&rqstp->rq_wait);
 373        } else {
 374                dprintk("svc: transport %p put into queue\n", xprt);
 375                list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
 376                BUG_ON(xprt->xpt_pool != pool);
 377        }
 378
 379out_unlock:
 380        spin_unlock_bh(&pool->sp_lock);
 381}
 382EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
 383
 384/*
 385 * Dequeue the first transport.  Must be called with the pool->sp_lock held.
 386 */
 387static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
 388{
 389        struct svc_xprt *xprt;
 390
 391        if (list_empty(&pool->sp_sockets))
 392                return NULL;
 393
 394        xprt = list_entry(pool->sp_sockets.next,
 395                          struct svc_xprt, xpt_ready);
 396        list_del_init(&xprt->xpt_ready);
 397
 398        dprintk("svc: transport %p dequeued, inuse=%d\n",
 399                xprt, atomic_read(&xprt->xpt_ref.refcount));
 400
 401        return xprt;
 402}
 403
 404/*
 405 * svc_xprt_received conditionally queues the transport for processing
 406 * by another thread. The caller must hold the XPT_BUSY bit and must
 407 * not thereafter touch transport data.
 408 *
 409 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or
 410 * insufficient) data.
 411 */
 412void svc_xprt_received(struct svc_xprt *xprt)
 413{
 414        BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags));
 415        xprt->xpt_pool = NULL;
 416        clear_bit(XPT_BUSY, &xprt->xpt_flags);
 417        svc_xprt_enqueue(xprt);
 418}
 419EXPORT_SYMBOL_GPL(svc_xprt_received);
 420
 421/**
 422 * svc_reserve - change the space reserved for the reply to a request.
 423 * @rqstp:  The request in question
 424 * @space: new max space to reserve
 425 *
 426 * Each request reserves some space on the output queue of the transport
 427 * to make sure the reply fits.  This function reduces that reserved
 428 * space to be the amount of space used already, plus @space.
 429 *
 430 */
 431void svc_reserve(struct svc_rqst *rqstp, int space)
 432{
 433        space += rqstp->rq_res.head[0].iov_len;
 434
 435        if (space < rqstp->rq_reserved) {
 436                struct svc_xprt *xprt = rqstp->rq_xprt;
 437                atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved);
 438                rqstp->rq_reserved = space;
 439
 440                svc_xprt_enqueue(xprt);
 441        }
 442}
 443EXPORT_SYMBOL_GPL(svc_reserve);
 444
 445static void svc_xprt_release(struct svc_rqst *rqstp)
 446{
 447        struct svc_xprt *xprt = rqstp->rq_xprt;
 448
 449        rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp);
 450
 451        kfree(rqstp->rq_deferred);
 452        rqstp->rq_deferred = NULL;
 453
 454        svc_free_res_pages(rqstp);
 455        rqstp->rq_res.page_len = 0;
 456        rqstp->rq_res.page_base = 0;
 457
 458        /* Reset response buffer and release
 459         * the reservation.
 460         * But first, check that enough space was reserved
 461         * for the reply, otherwise we have a bug!
 462         */
 463        if ((rqstp->rq_res.len) >  rqstp->rq_reserved)
 464                printk(KERN_ERR "RPC request reserved %d but used %d\n",
 465                       rqstp->rq_reserved,
 466                       rqstp->rq_res.len);
 467
 468        rqstp->rq_res.head[0].iov_len = 0;
 469        svc_reserve(rqstp, 0);
 470        rqstp->rq_xprt = NULL;
 471
 472        svc_xprt_put(xprt);
 473}
 474
 475/*
 476 * External function to wake up a server waiting for data
 477 * This really only makes sense for services like lockd
 478 * which have exactly one thread anyway.
 479 */
 480void svc_wake_up(struct svc_serv *serv)
 481{
 482        struct svc_rqst *rqstp;
 483        unsigned int i;
 484        struct svc_pool *pool;
 485
 486        for (i = 0; i < serv->sv_nrpools; i++) {
 487                pool = &serv->sv_pools[i];
 488
 489                spin_lock_bh(&pool->sp_lock);
 490                if (!list_empty(&pool->sp_threads)) {
 491                        rqstp = list_entry(pool->sp_threads.next,
 492                                           struct svc_rqst,
 493                                           rq_list);
 494                        dprintk("svc: daemon %p woken up.\n", rqstp);
 495                        /*
 496                        svc_thread_dequeue(pool, rqstp);
 497                        rqstp->rq_xprt = NULL;
 498                         */
 499                        wake_up(&rqstp->rq_wait);
 500                }
 501                spin_unlock_bh(&pool->sp_lock);
 502        }
 503}
 504EXPORT_SYMBOL_GPL(svc_wake_up);
 505
 506int svc_port_is_privileged(struct sockaddr *sin)
 507{
 508        switch (sin->sa_family) {
 509        case AF_INET:
 510                return ntohs(((struct sockaddr_in *)sin)->sin_port)
 511                        < PROT_SOCK;
 512        case AF_INET6:
 513                return ntohs(((struct sockaddr_in6 *)sin)->sin6_port)
 514                        < PROT_SOCK;
 515        default:
 516                return 0;
 517        }
 518}
 519
 520/*
 521 * Make sure that we don't have too many active connections. If we have,
 522 * something must be dropped. It's not clear what will happen if we allow
 523 * "too many" connections, but when dealing with network-facing software,
 524 * we have to code defensively. Here we do that by imposing hard limits.
 525 *
 526 * There's no point in trying to do random drop here for DoS
 527 * prevention. The NFS clients does 1 reconnect in 15 seconds. An
 528 * attacker can easily beat that.
 529 *
 530 * The only somewhat efficient mechanism would be if drop old
 531 * connections from the same IP first. But right now we don't even
 532 * record the client IP in svc_sock.
 533 *
 534 * single-threaded services that expect a lot of clients will probably
 535 * need to set sv_maxconn to override the default value which is based
 536 * on the number of threads
 537 */
 538static void svc_check_conn_limits(struct svc_serv *serv)
 539{
 540        unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn :
 541                                (serv->sv_nrthreads+3) * 20;
 542
 543        if (serv->sv_tmpcnt > limit) {
 544                struct svc_xprt *xprt = NULL;
 545                spin_lock_bh(&serv->sv_lock);
 546                if (!list_empty(&serv->sv_tempsocks)) {
 547                        if (net_ratelimit()) {
 548                                /* Try to help the admin */
 549                                printk(KERN_NOTICE "%s: too many open  "
 550                                       "connections, consider increasing %s\n",
 551                                       serv->sv_name, serv->sv_maxconn ?
 552                                       "the max number of connections." :
 553                                       "the number of threads.");
 554                        }
 555                        /*
 556                         * Always select the oldest connection. It's not fair,
 557                         * but so is life
 558                         */
 559                        xprt = list_entry(serv->sv_tempsocks.prev,
 560                                          struct svc_xprt,
 561                                          xpt_list);
 562                        set_bit(XPT_CLOSE, &xprt->xpt_flags);
 563                        svc_xprt_get(xprt);
 564                }
 565                spin_unlock_bh(&serv->sv_lock);
 566
 567                if (xprt) {
 568                        svc_xprt_enqueue(xprt);
 569                        svc_xprt_put(xprt);
 570                }
 571        }
 572}
 573
 574/*
 575 * Receive the next request on any transport.  This code is carefully
 576 * organised not to touch any cachelines in the shared svc_serv
 577 * structure, only cachelines in the local svc_pool.
 578 */
 579int svc_recv(struct svc_rqst *rqstp, long timeout)
 580{
 581        struct svc_xprt         *xprt = NULL;
 582        struct svc_serv         *serv = rqstp->rq_server;
 583        struct svc_pool         *pool = rqstp->rq_pool;
 584        int                     len, i;
 585        int                     pages;
 586        struct xdr_buf          *arg;
 587        DECLARE_WAITQUEUE(wait, current);
 588
 589        dprintk("svc: server %p waiting for data (to = %ld)\n",
 590                rqstp, timeout);
 591
 592        if (rqstp->rq_xprt)
 593                printk(KERN_ERR
 594                        "svc_recv: service %p, transport not NULL!\n",
 595                         rqstp);
 596        if (waitqueue_active(&rqstp->rq_wait))
 597                printk(KERN_ERR
 598                        "svc_recv: service %p, wait queue active!\n",
 599                         rqstp);
 600
 601        /* now allocate needed pages.  If we get a failure, sleep briefly */
 602        pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
 603        for (i = 0; i < pages ; i++)
 604                while (rqstp->rq_pages[i] == NULL) {
 605                        struct page *p = alloc_page(GFP_KERNEL);
 606                        if (!p) {
 607                                set_current_state(TASK_INTERRUPTIBLE);
 608                                if (signalled() || kthread_should_stop()) {
 609                                        set_current_state(TASK_RUNNING);
 610                                        return -EINTR;
 611                                }
 612                                schedule_timeout(msecs_to_jiffies(500));
 613                        }
 614                        rqstp->rq_pages[i] = p;
 615                }
 616        rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */
 617        BUG_ON(pages >= RPCSVC_MAXPAGES);
 618
 619        /* Make arg->head point to first page and arg->pages point to rest */
 620        arg = &rqstp->rq_arg;
 621        arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
 622        arg->head[0].iov_len = PAGE_SIZE;
 623        arg->pages = rqstp->rq_pages + 1;
 624        arg->page_base = 0;
 625        /* save at least one page for response */
 626        arg->page_len = (pages-2)*PAGE_SIZE;
 627        arg->len = (pages-1)*PAGE_SIZE;
 628        arg->tail[0].iov_len = 0;
 629
 630        try_to_freeze();
 631        cond_resched();
 632        if (signalled() || kthread_should_stop())
 633                return -EINTR;
 634
 635        spin_lock_bh(&pool->sp_lock);
 636        xprt = svc_xprt_dequeue(pool);
 637        if (xprt) {
 638                rqstp->rq_xprt = xprt;
 639                svc_xprt_get(xprt);
 640                rqstp->rq_reserved = serv->sv_max_mesg;
 641                atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
 642        } else {
 643                /* No data pending. Go to sleep */
 644                svc_thread_enqueue(pool, rqstp);
 645
 646                /*
 647                 * We have to be able to interrupt this wait
 648                 * to bring down the daemons ...
 649                 */
 650                set_current_state(TASK_INTERRUPTIBLE);
 651
 652                /*
 653                 * checking kthread_should_stop() here allows us to avoid
 654                 * locking and signalling when stopping kthreads that call
 655                 * svc_recv. If the thread has already been woken up, then
 656                 * we can exit here without sleeping. If not, then it
 657                 * it'll be woken up quickly during the schedule_timeout
 658                 */
 659                if (kthread_should_stop()) {
 660                        set_current_state(TASK_RUNNING);
 661                        spin_unlock_bh(&pool->sp_lock);
 662                        return -EINTR;
 663                }
 664
 665                add_wait_queue(&rqstp->rq_wait, &wait);
 666                spin_unlock_bh(&pool->sp_lock);
 667
 668                schedule_timeout(timeout);
 669
 670                try_to_freeze();
 671
 672                spin_lock_bh(&pool->sp_lock);
 673                remove_wait_queue(&rqstp->rq_wait, &wait);
 674
 675                xprt = rqstp->rq_xprt;
 676                if (!xprt) {
 677                        svc_thread_dequeue(pool, rqstp);
 678                        spin_unlock_bh(&pool->sp_lock);
 679                        dprintk("svc: server %p, no data yet\n", rqstp);
 680                        if (signalled() || kthread_should_stop())
 681                                return -EINTR;
 682                        else
 683                                return -EAGAIN;
 684                }
 685        }
 686        spin_unlock_bh(&pool->sp_lock);
 687
 688        len = 0;
 689        if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
 690                dprintk("svc_recv: found XPT_CLOSE\n");
 691                svc_delete_xprt(xprt);
 692        } else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
 693                struct svc_xprt *newxpt;
 694                newxpt = xprt->xpt_ops->xpo_accept(xprt);
 695                if (newxpt) {
 696                        /*
 697                         * We know this module_get will succeed because the
 698                         * listener holds a reference too
 699                         */
 700                        __module_get(newxpt->xpt_class->xcl_owner);
 701                        svc_check_conn_limits(xprt->xpt_server);
 702                        spin_lock_bh(&serv->sv_lock);
 703                        set_bit(XPT_TEMP, &newxpt->xpt_flags);
 704                        list_add(&newxpt->xpt_list, &serv->sv_tempsocks);
 705                        serv->sv_tmpcnt++;
 706                        if (serv->sv_temptimer.function == NULL) {
 707                                /* setup timer to age temp transports */
 708                                setup_timer(&serv->sv_temptimer,
 709                                            svc_age_temp_xprts,
 710                                            (unsigned long)serv);
 711                                mod_timer(&serv->sv_temptimer,
 712                                          jiffies + svc_conn_age_period * HZ);
 713                        }
 714                        spin_unlock_bh(&serv->sv_lock);
 715                        svc_xprt_received(newxpt);
 716                }
 717                svc_xprt_received(xprt);
 718        } else {
 719                dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
 720                        rqstp, pool->sp_id, xprt,
 721                        atomic_read(&xprt->xpt_ref.refcount));
 722                rqstp->rq_deferred = svc_deferred_dequeue(xprt);
 723                if (rqstp->rq_deferred) {
 724                        svc_xprt_received(xprt);
 725                        len = svc_deferred_recv(rqstp);
 726                } else
 727                        len = xprt->xpt_ops->xpo_recvfrom(rqstp);
 728                dprintk("svc: got len=%d\n", len);
 729        }
 730
 731        /* No data, incomplete (TCP) read, or accept() */
 732        if (len == 0 || len == -EAGAIN) {
 733                rqstp->rq_res.len = 0;
 734                svc_xprt_release(rqstp);
 735                return -EAGAIN;
 736        }
 737        clear_bit(XPT_OLD, &xprt->xpt_flags);
 738
 739        rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp));
 740        rqstp->rq_chandle.defer = svc_defer;
 741
 742        if (serv->sv_stats)
 743                serv->sv_stats->netcnt++;
 744        return len;
 745}
 746EXPORT_SYMBOL_GPL(svc_recv);
 747
 748/*
 749 * Drop request
 750 */
 751void svc_drop(struct svc_rqst *rqstp)
 752{
 753        dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt);
 754        svc_xprt_release(rqstp);
 755}
 756EXPORT_SYMBOL_GPL(svc_drop);
 757
 758/*
 759 * Return reply to client.
 760 */
 761int svc_send(struct svc_rqst *rqstp)
 762{
 763        struct svc_xprt *xprt;
 764        int             len;
 765        struct xdr_buf  *xb;
 766
 767        xprt = rqstp->rq_xprt;
 768        if (!xprt)
 769                return -EFAULT;
 770
 771        /* release the receive skb before sending the reply */
 772        rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp);
 773
 774        /* calculate over-all length */
 775        xb = &rqstp->rq_res;
 776        xb->len = xb->head[0].iov_len +
 777                xb->page_len +
 778                xb->tail[0].iov_len;
 779
 780        /* Grab mutex to serialize outgoing data. */
 781        mutex_lock(&xprt->xpt_mutex);
 782        if (test_bit(XPT_DEAD, &xprt->xpt_flags))
 783                len = -ENOTCONN;
 784        else
 785                len = xprt->xpt_ops->xpo_sendto(rqstp);
 786        mutex_unlock(&xprt->xpt_mutex);
 787        svc_xprt_release(rqstp);
 788
 789        if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
 790                return 0;
 791        return len;
 792}
 793
 794/*
 795 * Timer function to close old temporary transports, using
 796 * a mark-and-sweep algorithm.
 797 */
 798static void svc_age_temp_xprts(unsigned long closure)
 799{
 800        struct svc_serv *serv = (struct svc_serv *)closure;
 801        struct svc_xprt *xprt;
 802        struct list_head *le, *next;
 803        LIST_HEAD(to_be_aged);
 804
 805        dprintk("svc_age_temp_xprts\n");
 806
 807        if (!spin_trylock_bh(&serv->sv_lock)) {
 808                /* busy, try again 1 sec later */
 809                dprintk("svc_age_temp_xprts: busy\n");
 810                mod_timer(&serv->sv_temptimer, jiffies + HZ);
 811                return;
 812        }
 813
 814        list_for_each_safe(le, next, &serv->sv_tempsocks) {
 815                xprt = list_entry(le, struct svc_xprt, xpt_list);
 816
 817                /* First time through, just mark it OLD. Second time
 818                 * through, close it. */
 819                if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags))
 820                        continue;
 821                if (atomic_read(&xprt->xpt_ref.refcount) > 1
 822                    || test_bit(XPT_BUSY, &xprt->xpt_flags))
 823                        continue;
 824                svc_xprt_get(xprt);
 825                list_move(le, &to_be_aged);
 826                set_bit(XPT_CLOSE, &xprt->xpt_flags);
 827                set_bit(XPT_DETACHED, &xprt->xpt_flags);
 828        }
 829        spin_unlock_bh(&serv->sv_lock);
 830
 831        while (!list_empty(&to_be_aged)) {
 832                le = to_be_aged.next;
 833                /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */
 834                list_del_init(le);
 835                xprt = list_entry(le, struct svc_xprt, xpt_list);
 836
 837                dprintk("queuing xprt %p for closing\n", xprt);
 838
 839                /* a thread will dequeue and close it soon */
 840                svc_xprt_enqueue(xprt);
 841                svc_xprt_put(xprt);
 842        }
 843
 844        mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ);
 845}
 846
 847/*
 848 * Remove a dead transport
 849 */
 850void svc_delete_xprt(struct svc_xprt *xprt)
 851{
 852        struct svc_serv *serv = xprt->xpt_server;
 853        struct svc_deferred_req *dr;
 854
 855        /* Only do this once */
 856        if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags))
 857                return;
 858
 859        dprintk("svc: svc_delete_xprt(%p)\n", xprt);
 860        xprt->xpt_ops->xpo_detach(xprt);
 861
 862        spin_lock_bh(&serv->sv_lock);
 863        if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags))
 864                list_del_init(&xprt->xpt_list);
 865        /*
 866         * We used to delete the transport from whichever list
 867         * it's sk_xprt.xpt_ready node was on, but we don't actually
 868         * need to.  This is because the only time we're called
 869         * while still attached to a queue, the queue itself
 870         * is about to be destroyed (in svc_destroy).
 871         */
 872        if (test_bit(XPT_TEMP, &xprt->xpt_flags))
 873                serv->sv_tmpcnt--;
 874
 875        for (dr = svc_deferred_dequeue(xprt); dr;
 876             dr = svc_deferred_dequeue(xprt)) {
 877                svc_xprt_put(xprt);
 878                kfree(dr);
 879        }
 880
 881        svc_xprt_put(xprt);
 882        spin_unlock_bh(&serv->sv_lock);
 883}
 884
 885void svc_close_xprt(struct svc_xprt *xprt)
 886{
 887        set_bit(XPT_CLOSE, &xprt->xpt_flags);
 888        if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags))
 889                /* someone else will have to effect the close */
 890                return;
 891
 892        svc_xprt_get(xprt);
 893        svc_delete_xprt(xprt);
 894        clear_bit(XPT_BUSY, &xprt->xpt_flags);
 895        svc_xprt_put(xprt);
 896}
 897EXPORT_SYMBOL_GPL(svc_close_xprt);
 898
 899void svc_close_all(struct list_head *xprt_list)
 900{
 901        struct svc_xprt *xprt;
 902        struct svc_xprt *tmp;
 903
 904        list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) {
 905                set_bit(XPT_CLOSE, &xprt->xpt_flags);
 906                if (test_bit(XPT_BUSY, &xprt->xpt_flags)) {
 907                        /* Waiting to be processed, but no threads left,
 908                         * So just remove it from the waiting list
 909                         */
 910                        list_del_init(&xprt->xpt_ready);
 911                        clear_bit(XPT_BUSY, &xprt->xpt_flags);
 912                }
 913                svc_close_xprt(xprt);
 914        }
 915}
 916
 917/*
 918 * Handle defer and revisit of requests
 919 */
 920
 921static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
 922{
 923        struct svc_deferred_req *dr =
 924                container_of(dreq, struct svc_deferred_req, handle);
 925        struct svc_xprt *xprt = dr->xprt;
 926
 927        spin_lock(&xprt->xpt_lock);
 928        set_bit(XPT_DEFERRED, &xprt->xpt_flags);
 929        if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) {
 930                spin_unlock(&xprt->xpt_lock);
 931                dprintk("revisit canceled\n");
 932                svc_xprt_put(xprt);
 933                kfree(dr);
 934                return;
 935        }
 936        dprintk("revisit queued\n");
 937        dr->xprt = NULL;
 938        list_add(&dr->handle.recent, &xprt->xpt_deferred);
 939        spin_unlock(&xprt->xpt_lock);
 940        svc_xprt_enqueue(xprt);
 941        svc_xprt_put(xprt);
 942}
 943
 944/*
 945 * Save the request off for later processing. The request buffer looks
 946 * like this:
 947 *
 948 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail>
 949 *
 950 * This code can only handle requests that consist of an xprt-header
 951 * and rpc-header.
 952 */
 953static struct cache_deferred_req *svc_defer(struct cache_req *req)
 954{
 955        struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
 956        struct svc_deferred_req *dr;
 957
 958        if (rqstp->rq_arg.page_len)
 959                return NULL; /* if more than a page, give up FIXME */
 960        if (rqstp->rq_deferred) {
 961                dr = rqstp->rq_deferred;
 962                rqstp->rq_deferred = NULL;
 963        } else {
 964                size_t skip;
 965                size_t size;
 966                /* FIXME maybe discard if size too large */
 967                size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len;
 968                dr = kmalloc(size, GFP_KERNEL);
 969                if (dr == NULL)
 970                        return NULL;
 971
 972                dr->handle.owner = rqstp->rq_server;
 973                dr->prot = rqstp->rq_prot;
 974                memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen);
 975                dr->addrlen = rqstp->rq_addrlen;
 976                dr->daddr = rqstp->rq_daddr;
 977                dr->argslen = rqstp->rq_arg.len >> 2;
 978                dr->xprt_hlen = rqstp->rq_xprt_hlen;
 979
 980                /* back up head to the start of the buffer and copy */
 981                skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
 982                memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip,
 983                       dr->argslen << 2);
 984        }
 985        svc_xprt_get(rqstp->rq_xprt);
 986        dr->xprt = rqstp->rq_xprt;
 987
 988        dr->handle.revisit = svc_revisit;
 989        return &dr->handle;
 990}
 991
 992/*
 993 * recv data from a deferred request into an active one
 994 */
 995static int svc_deferred_recv(struct svc_rqst *rqstp)
 996{
 997        struct svc_deferred_req *dr = rqstp->rq_deferred;
 998
 999        /* setup iov_base past transport header */
1000        rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2);
1001        /* The iov_len does not include the transport header bytes */
1002        rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen;
1003        rqstp->rq_arg.page_len = 0;
1004        /* The rq_arg.len includes the transport header bytes */
1005        rqstp->rq_arg.len     = dr->argslen<<2;
1006        rqstp->rq_prot        = dr->prot;
1007        memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen);
1008        rqstp->rq_addrlen     = dr->addrlen;
1009        /* Save off transport header len in case we get deferred again */
1010        rqstp->rq_xprt_hlen   = dr->xprt_hlen;
1011        rqstp->rq_daddr       = dr->daddr;
1012        rqstp->rq_respages    = rqstp->rq_pages;
1013        return (dr->argslen<<2) - dr->xprt_hlen;
1014}
1015
1016
1017static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt)
1018{
1019        struct svc_deferred_req *dr = NULL;
1020
1021        if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags))
1022                return NULL;
1023        spin_lock(&xprt->xpt_lock);
1024        clear_bit(XPT_DEFERRED, &xprt->xpt_flags);
1025        if (!list_empty(&xprt->xpt_deferred)) {
1026                dr = list_entry(xprt->xpt_deferred.next,
1027                                struct svc_deferred_req,
1028                                handle.recent);
1029                list_del_init(&dr->handle.recent);
1030                set_bit(XPT_DEFERRED, &xprt->xpt_flags);
1031        }
1032        spin_unlock(&xprt->xpt_lock);
1033        return dr;
1034}
1035
1036/*
1037 * Return the transport instance pointer for the endpoint accepting
1038 * connections/peer traffic from the specified transport class,
1039 * address family and port.
1040 *
1041 * Specifying 0 for the address family or port is effectively a
1042 * wild-card, and will result in matching the first transport in the
1043 * service's list that has a matching class name.
1044 */
1045struct svc_xprt *svc_find_xprt(struct svc_serv *serv, char *xcl_name,
1046                               int af, int port)
1047{
1048        struct svc_xprt *xprt;
1049        struct svc_xprt *found = NULL;
1050
1051        /* Sanity check the args */
1052        if (!serv || !xcl_name)
1053                return found;
1054
1055        spin_lock_bh(&serv->sv_lock);
1056        list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) {
1057                if (strcmp(xprt->xpt_class->xcl_name, xcl_name))
1058                        continue;
1059                if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family)
1060                        continue;
1061                if (port && port != svc_xprt_local_port(xprt))
1062                        continue;
1063                found = xprt;
1064                svc_xprt_get(xprt);
1065                break;
1066        }
1067        spin_unlock_bh(&serv->sv_lock);
1068        return found;
1069}
1070EXPORT_SYMBOL_GPL(svc_find_xprt);
1071
1072/*
1073 * Format a buffer with a list of the active transports. A zero for
1074 * the buflen parameter disables target buffer overflow checking.
1075 */
1076int svc_xprt_names(struct svc_serv *serv, char *buf, int buflen)
1077{
1078        struct svc_xprt *xprt;
1079        char xprt_str[64];
1080        int totlen = 0;
1081        int len;
1082
1083        /* Sanity check args */
1084        if (!serv)
1085                return 0;
1086
1087        spin_lock_bh(&serv->sv_lock);
1088        list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) {
1089                len = snprintf(xprt_str, sizeof(xprt_str),
1090                               "%s %d\n", xprt->xpt_class->xcl_name,
1091                               svc_xprt_local_port(xprt));
1092                /* If the string was truncated, replace with error string */
1093                if (len >= sizeof(xprt_str))
1094                        strcpy(xprt_str, "name-too-long\n");
1095                /* Don't overflow buffer */
1096                len = strlen(xprt_str);
1097                if (buflen && (len + totlen >= buflen))
1098                        break;
1099                strcpy(buf+totlen, xprt_str);
1100                totlen += len;
1101        }
1102        spin_unlock_bh(&serv->sv_lock);
1103        return totlen;
1104}
1105EXPORT_SYMBOL_GPL(svc_xprt_names);
1106