linux/net/sunrpc/xprt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/xprt.c
   3 *
   4 *  This is a generic RPC call interface supporting congestion avoidance,
   5 *  and asynchronous calls.
   6 *
   7 *  The interface works like this:
   8 *
   9 *  -   When a process places a call, it allocates a request slot if
  10 *      one is available. Otherwise, it sleeps on the backlog queue
  11 *      (xprt_reserve).
  12 *  -   Next, the caller puts together the RPC message, stuffs it into
  13 *      the request struct, and calls xprt_transmit().
  14 *  -   xprt_transmit sends the message and installs the caller on the
  15 *      transport's wait list. At the same time, it installs a timer that
  16 *      is run after the packet's timeout has expired.
  17 *  -   When a packet arrives, the data_ready handler walks the list of
  18 *      pending requests for that transport. If a matching XID is found, the
  19 *      caller is woken up, and the timer removed.
  20 *  -   When no reply arrives within the timeout interval, the timer is
  21 *      fired by the kernel and runs xprt_timer(). It either adjusts the
  22 *      timeout values (minor timeout) or wakes up the caller with a status
  23 *      of -ETIMEDOUT.
  24 *  -   When the caller receives a notification from RPC that a reply arrived,
  25 *      it should release the RPC slot, and process the reply.
  26 *      If the call timed out, it may choose to retry the operation by
  27 *      adjusting the initial timeout value, and simply calling rpc_call
  28 *      again.
  29 *
  30 *  Support for async RPC is done through a set of RPC-specific scheduling
  31 *  primitives that `transparently' work for processes as well as async
  32 *  tasks that rely on callbacks.
  33 *
  34 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  35 *
  36 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
  37 */
  38
  39#include <linux/module.h>
  40
  41#include <linux/types.h>
  42#include <linux/interrupt.h>
  43#include <linux/workqueue.h>
  44#include <linux/net.h>
  45
  46#include <linux/sunrpc/clnt.h>
  47#include <linux/sunrpc/metrics.h>
  48
  49/*
  50 * Local variables
  51 */
  52
  53#ifdef RPC_DEBUG
  54# define RPCDBG_FACILITY        RPCDBG_XPRT
  55#endif
  56
  57/*
  58 * Local functions
  59 */
  60static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
  61static inline void      do_xprt_reserve(struct rpc_task *);
  62static void     xprt_connect_status(struct rpc_task *task);
  63static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
  64
  65static DEFINE_SPINLOCK(xprt_list_lock);
  66static LIST_HEAD(xprt_list);
  67
  68/*
  69 * The transport code maintains an estimate on the maximum number of out-
  70 * standing RPC requests, using a smoothed version of the congestion
  71 * avoidance implemented in 44BSD. This is basically the Van Jacobson
  72 * congestion algorithm: If a retransmit occurs, the congestion window is
  73 * halved; otherwise, it is incremented by 1/cwnd when
  74 *
  75 *      -       a reply is received and
  76 *      -       a full number of requests are outstanding and
  77 *      -       the congestion window hasn't been updated recently.
  78 */
  79#define RPC_CWNDSHIFT           (8U)
  80#define RPC_CWNDSCALE           (1U << RPC_CWNDSHIFT)
  81#define RPC_INITCWND            RPC_CWNDSCALE
  82#define RPC_MAXCWND(xprt)       ((xprt)->max_reqs << RPC_CWNDSHIFT)
  83
  84#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
  85
  86/**
  87 * xprt_register_transport - register a transport implementation
  88 * @transport: transport to register
  89 *
  90 * If a transport implementation is loaded as a kernel module, it can
  91 * call this interface to make itself known to the RPC client.
  92 *
  93 * Returns:
  94 * 0:           transport successfully registered
  95 * -EEXIST:     transport already registered
  96 * -EINVAL:     transport module being unloaded
  97 */
  98int xprt_register_transport(struct xprt_class *transport)
  99{
 100        struct xprt_class *t;
 101        int result;
 102
 103        result = -EEXIST;
 104        spin_lock(&xprt_list_lock);
 105        list_for_each_entry(t, &xprt_list, list) {
 106                /* don't register the same transport class twice */
 107                if (t->ident == transport->ident)
 108                        goto out;
 109        }
 110
 111        list_add_tail(&transport->list, &xprt_list);
 112        printk(KERN_INFO "RPC: Registered %s transport module.\n",
 113               transport->name);
 114        result = 0;
 115
 116out:
 117        spin_unlock(&xprt_list_lock);
 118        return result;
 119}
 120EXPORT_SYMBOL_GPL(xprt_register_transport);
 121
 122/**
 123 * xprt_unregister_transport - unregister a transport implementation
 124 * @transport: transport to unregister
 125 *
 126 * Returns:
 127 * 0:           transport successfully unregistered
 128 * -ENOENT:     transport never registered
 129 */
 130int xprt_unregister_transport(struct xprt_class *transport)
 131{
 132        struct xprt_class *t;
 133        int result;
 134
 135        result = 0;
 136        spin_lock(&xprt_list_lock);
 137        list_for_each_entry(t, &xprt_list, list) {
 138                if (t == transport) {
 139                        printk(KERN_INFO
 140                                "RPC: Unregistered %s transport module.\n",
 141                                transport->name);
 142                        list_del_init(&transport->list);
 143                        goto out;
 144                }
 145        }
 146        result = -ENOENT;
 147
 148out:
 149        spin_unlock(&xprt_list_lock);
 150        return result;
 151}
 152EXPORT_SYMBOL_GPL(xprt_unregister_transport);
 153
 154/**
 155 * xprt_reserve_xprt - serialize write access to transports
 156 * @task: task that is requesting access to the transport
 157 *
 158 * This prevents mixing the payload of separate requests, and prevents
 159 * transport connects from colliding with writes.  No congestion control
 160 * is provided.
 161 */
 162int xprt_reserve_xprt(struct rpc_task *task)
 163{
 164        struct rpc_xprt *xprt = task->tk_xprt;
 165        struct rpc_rqst *req = task->tk_rqstp;
 166
 167        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 168                if (task == xprt->snd_task)
 169                        return 1;
 170                if (task == NULL)
 171                        return 0;
 172                goto out_sleep;
 173        }
 174        xprt->snd_task = task;
 175        if (req) {
 176                req->rq_bytes_sent = 0;
 177                req->rq_ntrans++;
 178        }
 179        return 1;
 180
 181out_sleep:
 182        dprintk("RPC: %5u failed to lock transport %p\n",
 183                        task->tk_pid, xprt);
 184        task->tk_timeout = 0;
 185        task->tk_status = -EAGAIN;
 186        if (req && req->rq_ntrans)
 187                rpc_sleep_on(&xprt->resend, task, NULL);
 188        else
 189                rpc_sleep_on(&xprt->sending, task, NULL);
 190        return 0;
 191}
 192EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
 193
 194static void xprt_clear_locked(struct rpc_xprt *xprt)
 195{
 196        xprt->snd_task = NULL;
 197        if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
 198                smp_mb__before_clear_bit();
 199                clear_bit(XPRT_LOCKED, &xprt->state);
 200                smp_mb__after_clear_bit();
 201        } else
 202                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 203}
 204
 205/*
 206 * xprt_reserve_xprt_cong - serialize write access to transports
 207 * @task: task that is requesting access to the transport
 208 *
 209 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
 210 * integrated into the decision of whether a request is allowed to be
 211 * woken up and given access to the transport.
 212 */
 213int xprt_reserve_xprt_cong(struct rpc_task *task)
 214{
 215        struct rpc_xprt *xprt = task->tk_xprt;
 216        struct rpc_rqst *req = task->tk_rqstp;
 217
 218        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 219                if (task == xprt->snd_task)
 220                        return 1;
 221                goto out_sleep;
 222        }
 223        if (__xprt_get_cong(xprt, task)) {
 224                xprt->snd_task = task;
 225                if (req) {
 226                        req->rq_bytes_sent = 0;
 227                        req->rq_ntrans++;
 228                }
 229                return 1;
 230        }
 231        xprt_clear_locked(xprt);
 232out_sleep:
 233        dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
 234        task->tk_timeout = 0;
 235        task->tk_status = -EAGAIN;
 236        if (req && req->rq_ntrans)
 237                rpc_sleep_on(&xprt->resend, task, NULL);
 238        else
 239                rpc_sleep_on(&xprt->sending, task, NULL);
 240        return 0;
 241}
 242EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
 243
 244static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 245{
 246        int retval;
 247
 248        spin_lock_bh(&xprt->transport_lock);
 249        retval = xprt->ops->reserve_xprt(task);
 250        spin_unlock_bh(&xprt->transport_lock);
 251        return retval;
 252}
 253
 254static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 255{
 256        struct rpc_task *task;
 257        struct rpc_rqst *req;
 258
 259        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 260                return;
 261
 262        task = rpc_wake_up_next(&xprt->resend);
 263        if (!task) {
 264                task = rpc_wake_up_next(&xprt->sending);
 265                if (!task)
 266                        goto out_unlock;
 267        }
 268
 269        req = task->tk_rqstp;
 270        xprt->snd_task = task;
 271        if (req) {
 272                req->rq_bytes_sent = 0;
 273                req->rq_ntrans++;
 274        }
 275        return;
 276
 277out_unlock:
 278        xprt_clear_locked(xprt);
 279}
 280
 281static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 282{
 283        struct rpc_task *task;
 284
 285        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 286                return;
 287        if (RPCXPRT_CONGESTED(xprt))
 288                goto out_unlock;
 289        task = rpc_wake_up_next(&xprt->resend);
 290        if (!task) {
 291                task = rpc_wake_up_next(&xprt->sending);
 292                if (!task)
 293                        goto out_unlock;
 294        }
 295        if (__xprt_get_cong(xprt, task)) {
 296                struct rpc_rqst *req = task->tk_rqstp;
 297                xprt->snd_task = task;
 298                if (req) {
 299                        req->rq_bytes_sent = 0;
 300                        req->rq_ntrans++;
 301                }
 302                return;
 303        }
 304out_unlock:
 305        xprt_clear_locked(xprt);
 306}
 307
 308/**
 309 * xprt_release_xprt - allow other requests to use a transport
 310 * @xprt: transport with other tasks potentially waiting
 311 * @task: task that is releasing access to the transport
 312 *
 313 * Note that "task" can be NULL.  No congestion control is provided.
 314 */
 315void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 316{
 317        if (xprt->snd_task == task) {
 318                xprt_clear_locked(xprt);
 319                __xprt_lock_write_next(xprt);
 320        }
 321}
 322EXPORT_SYMBOL_GPL(xprt_release_xprt);
 323
 324/**
 325 * xprt_release_xprt_cong - allow other requests to use a transport
 326 * @xprt: transport with other tasks potentially waiting
 327 * @task: task that is releasing access to the transport
 328 *
 329 * Note that "task" can be NULL.  Another task is awoken to use the
 330 * transport if the transport's congestion window allows it.
 331 */
 332void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 333{
 334        if (xprt->snd_task == task) {
 335                xprt_clear_locked(xprt);
 336                __xprt_lock_write_next_cong(xprt);
 337        }
 338}
 339EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
 340
 341static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 342{
 343        spin_lock_bh(&xprt->transport_lock);
 344        xprt->ops->release_xprt(xprt, task);
 345        spin_unlock_bh(&xprt->transport_lock);
 346}
 347
 348/*
 349 * Van Jacobson congestion avoidance. Check if the congestion window
 350 * overflowed. Put the task to sleep if this is the case.
 351 */
 352static int
 353__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 354{
 355        struct rpc_rqst *req = task->tk_rqstp;
 356
 357        if (req->rq_cong)
 358                return 1;
 359        dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
 360                        task->tk_pid, xprt->cong, xprt->cwnd);
 361        if (RPCXPRT_CONGESTED(xprt))
 362                return 0;
 363        req->rq_cong = 1;
 364        xprt->cong += RPC_CWNDSCALE;
 365        return 1;
 366}
 367
 368/*
 369 * Adjust the congestion window, and wake up the next task
 370 * that has been sleeping due to congestion
 371 */
 372static void
 373__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 374{
 375        if (!req->rq_cong)
 376                return;
 377        req->rq_cong = 0;
 378        xprt->cong -= RPC_CWNDSCALE;
 379        __xprt_lock_write_next_cong(xprt);
 380}
 381
 382/**
 383 * xprt_release_rqst_cong - housekeeping when request is complete
 384 * @task: RPC request that recently completed
 385 *
 386 * Useful for transports that require congestion control.
 387 */
 388void xprt_release_rqst_cong(struct rpc_task *task)
 389{
 390        __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
 391}
 392EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
 393
 394/**
 395 * xprt_adjust_cwnd - adjust transport congestion window
 396 * @task: recently completed RPC request used to adjust window
 397 * @result: result code of completed RPC request
 398 *
 399 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
 400 */
 401void xprt_adjust_cwnd(struct rpc_task *task, int result)
 402{
 403        struct rpc_rqst *req = task->tk_rqstp;
 404        struct rpc_xprt *xprt = task->tk_xprt;
 405        unsigned long cwnd = xprt->cwnd;
 406
 407        if (result >= 0 && cwnd <= xprt->cong) {
 408                /* The (cwnd >> 1) term makes sure
 409                 * the result gets rounded properly. */
 410                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
 411                if (cwnd > RPC_MAXCWND(xprt))
 412                        cwnd = RPC_MAXCWND(xprt);
 413                __xprt_lock_write_next_cong(xprt);
 414        } else if (result == -ETIMEDOUT) {
 415                cwnd >>= 1;
 416                if (cwnd < RPC_CWNDSCALE)
 417                        cwnd = RPC_CWNDSCALE;
 418        }
 419        dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
 420                        xprt->cong, xprt->cwnd, cwnd);
 421        xprt->cwnd = cwnd;
 422        __xprt_put_cong(xprt, req);
 423}
 424EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
 425
 426/**
 427 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
 428 * @xprt: transport with waiting tasks
 429 * @status: result code to plant in each task before waking it
 430 *
 431 */
 432void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
 433{
 434        if (status < 0)
 435                rpc_wake_up_status(&xprt->pending, status);
 436        else
 437                rpc_wake_up(&xprt->pending);
 438}
 439EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
 440
 441/**
 442 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
 443 * @task: task to be put to sleep
 444 * @action: function pointer to be executed after wait
 445 */
 446void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
 447{
 448        struct rpc_rqst *req = task->tk_rqstp;
 449        struct rpc_xprt *xprt = req->rq_xprt;
 450
 451        task->tk_timeout = req->rq_timeout;
 452        rpc_sleep_on(&xprt->pending, task, action);
 453}
 454EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 455
 456/**
 457 * xprt_write_space - wake the task waiting for transport output buffer space
 458 * @xprt: transport with waiting tasks
 459 *
 460 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
 461 */
 462void xprt_write_space(struct rpc_xprt *xprt)
 463{
 464        if (unlikely(xprt->shutdown))
 465                return;
 466
 467        spin_lock_bh(&xprt->transport_lock);
 468        if (xprt->snd_task) {
 469                dprintk("RPC:       write space: waking waiting task on "
 470                                "xprt %p\n", xprt);
 471                rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
 472        }
 473        spin_unlock_bh(&xprt->transport_lock);
 474}
 475EXPORT_SYMBOL_GPL(xprt_write_space);
 476
 477/**
 478 * xprt_set_retrans_timeout_def - set a request's retransmit timeout
 479 * @task: task whose timeout is to be set
 480 *
 481 * Set a request's retransmit timeout based on the transport's
 482 * default timeout parameters.  Used by transports that don't adjust
 483 * the retransmit timeout based on round-trip time estimation.
 484 */
 485void xprt_set_retrans_timeout_def(struct rpc_task *task)
 486{
 487        task->tk_timeout = task->tk_rqstp->rq_timeout;
 488}
 489EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
 490
 491/*
 492 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
 493 * @task: task whose timeout is to be set
 494 *
 495 * Set a request's retransmit timeout using the RTT estimator.
 496 */
 497void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
 498{
 499        int timer = task->tk_msg.rpc_proc->p_timer;
 500        struct rpc_clnt *clnt = task->tk_client;
 501        struct rpc_rtt *rtt = clnt->cl_rtt;
 502        struct rpc_rqst *req = task->tk_rqstp;
 503        unsigned long max_timeout = clnt->cl_timeout->to_maxval;
 504
 505        task->tk_timeout = rpc_calc_rto(rtt, timer);
 506        task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
 507        if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
 508                task->tk_timeout = max_timeout;
 509}
 510EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
 511
 512static void xprt_reset_majortimeo(struct rpc_rqst *req)
 513{
 514        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 515
 516        req->rq_majortimeo = req->rq_timeout;
 517        if (to->to_exponential)
 518                req->rq_majortimeo <<= to->to_retries;
 519        else
 520                req->rq_majortimeo += to->to_increment * to->to_retries;
 521        if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
 522                req->rq_majortimeo = to->to_maxval;
 523        req->rq_majortimeo += jiffies;
 524}
 525
 526/**
 527 * xprt_adjust_timeout - adjust timeout values for next retransmit
 528 * @req: RPC request containing parameters to use for the adjustment
 529 *
 530 */
 531int xprt_adjust_timeout(struct rpc_rqst *req)
 532{
 533        struct rpc_xprt *xprt = req->rq_xprt;
 534        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 535        int status = 0;
 536
 537        if (time_before(jiffies, req->rq_majortimeo)) {
 538                if (to->to_exponential)
 539                        req->rq_timeout <<= 1;
 540                else
 541                        req->rq_timeout += to->to_increment;
 542                if (to->to_maxval && req->rq_timeout >= to->to_maxval)
 543                        req->rq_timeout = to->to_maxval;
 544                req->rq_retries++;
 545        } else {
 546                req->rq_timeout = to->to_initval;
 547                req->rq_retries = 0;
 548                xprt_reset_majortimeo(req);
 549                /* Reset the RTT counters == "slow start" */
 550                spin_lock_bh(&xprt->transport_lock);
 551                rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
 552                spin_unlock_bh(&xprt->transport_lock);
 553                status = -ETIMEDOUT;
 554        }
 555
 556        if (req->rq_timeout == 0) {
 557                printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
 558                req->rq_timeout = 5 * HZ;
 559        }
 560        return status;
 561}
 562
 563static void xprt_autoclose(struct work_struct *work)
 564{
 565        struct rpc_xprt *xprt =
 566                container_of(work, struct rpc_xprt, task_cleanup);
 567
 568        xprt->ops->close(xprt);
 569        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 570        xprt_release_write(xprt, NULL);
 571}
 572
 573/**
 574 * xprt_disconnect_done - mark a transport as disconnected
 575 * @xprt: transport to flag for disconnect
 576 *
 577 */
 578void xprt_disconnect_done(struct rpc_xprt *xprt)
 579{
 580        dprintk("RPC:       disconnected transport %p\n", xprt);
 581        spin_lock_bh(&xprt->transport_lock);
 582        xprt_clear_connected(xprt);
 583        xprt_wake_pending_tasks(xprt, -ENOTCONN);
 584        spin_unlock_bh(&xprt->transport_lock);
 585}
 586EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 587
 588/**
 589 * xprt_force_disconnect - force a transport to disconnect
 590 * @xprt: transport to disconnect
 591 *
 592 */
 593void xprt_force_disconnect(struct rpc_xprt *xprt)
 594{
 595        /* Don't race with the test_bit() in xprt_clear_locked() */
 596        spin_lock_bh(&xprt->transport_lock);
 597        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 598        /* Try to schedule an autoclose RPC call */
 599        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 600                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 601        xprt_wake_pending_tasks(xprt, -ENOTCONN);
 602        spin_unlock_bh(&xprt->transport_lock);
 603}
 604
 605/**
 606 * xprt_conditional_disconnect - force a transport to disconnect
 607 * @xprt: transport to disconnect
 608 * @cookie: 'connection cookie'
 609 *
 610 * This attempts to break the connection if and only if 'cookie' matches
 611 * the current transport 'connection cookie'. It ensures that we don't
 612 * try to break the connection more than once when we need to retransmit
 613 * a batch of RPC requests.
 614 *
 615 */
 616void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 617{
 618        /* Don't race with the test_bit() in xprt_clear_locked() */
 619        spin_lock_bh(&xprt->transport_lock);
 620        if (cookie != xprt->connect_cookie)
 621                goto out;
 622        if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
 623                goto out;
 624        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 625        /* Try to schedule an autoclose RPC call */
 626        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 627                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 628        xprt_wake_pending_tasks(xprt, -ENOTCONN);
 629out:
 630        spin_unlock_bh(&xprt->transport_lock);
 631}
 632
 633static void
 634xprt_init_autodisconnect(unsigned long data)
 635{
 636        struct rpc_xprt *xprt = (struct rpc_xprt *)data;
 637
 638        spin_lock(&xprt->transport_lock);
 639        if (!list_empty(&xprt->recv) || xprt->shutdown)
 640                goto out_abort;
 641        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 642                goto out_abort;
 643        spin_unlock(&xprt->transport_lock);
 644        if (xprt_connecting(xprt))
 645                xprt_release_write(xprt, NULL);
 646        else
 647                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 648        return;
 649out_abort:
 650        spin_unlock(&xprt->transport_lock);
 651}
 652
 653/**
 654 * xprt_connect - schedule a transport connect operation
 655 * @task: RPC task that is requesting the connect
 656 *
 657 */
 658void xprt_connect(struct rpc_task *task)
 659{
 660        struct rpc_xprt *xprt = task->tk_xprt;
 661
 662        dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
 663                        xprt, (xprt_connected(xprt) ? "is" : "is not"));
 664
 665        if (!xprt_bound(xprt)) {
 666                task->tk_status = -EAGAIN;
 667                return;
 668        }
 669        if (!xprt_lock_write(xprt, task))
 670                return;
 671        if (xprt_connected(xprt))
 672                xprt_release_write(xprt, task);
 673        else {
 674                if (task->tk_rqstp)
 675                        task->tk_rqstp->rq_bytes_sent = 0;
 676
 677                task->tk_timeout = xprt->connect_timeout;
 678                rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
 679                xprt->stat.connect_start = jiffies;
 680                xprt->ops->connect(task);
 681        }
 682        return;
 683}
 684
 685static void xprt_connect_status(struct rpc_task *task)
 686{
 687        struct rpc_xprt *xprt = task->tk_xprt;
 688
 689        if (task->tk_status == 0) {
 690                xprt->stat.connect_count++;
 691                xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
 692                dprintk("RPC: %5u xprt_connect_status: connection established\n",
 693                                task->tk_pid);
 694                return;
 695        }
 696
 697        switch (task->tk_status) {
 698        case -ENOTCONN:
 699                dprintk("RPC: %5u xprt_connect_status: connection broken\n",
 700                                task->tk_pid);
 701                break;
 702        case -ETIMEDOUT:
 703                dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
 704                                "out\n", task->tk_pid);
 705                break;
 706        default:
 707                dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
 708                                "server %s\n", task->tk_pid, -task->tk_status,
 709                                task->tk_client->cl_server);
 710                xprt_release_write(xprt, task);
 711                task->tk_status = -EIO;
 712        }
 713}
 714
 715/**
 716 * xprt_lookup_rqst - find an RPC request corresponding to an XID
 717 * @xprt: transport on which the original request was transmitted
 718 * @xid: RPC XID of incoming reply
 719 *
 720 */
 721struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 722{
 723        struct list_head *pos;
 724
 725        list_for_each(pos, &xprt->recv) {
 726                struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
 727                if (entry->rq_xid == xid)
 728                        return entry;
 729        }
 730
 731        dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
 732                        ntohl(xid));
 733        xprt->stat.bad_xids++;
 734        return NULL;
 735}
 736EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 737
 738/**
 739 * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
 740 * @task: RPC request that recently completed
 741 *
 742 */
 743void xprt_update_rtt(struct rpc_task *task)
 744{
 745        struct rpc_rqst *req = task->tk_rqstp;
 746        struct rpc_rtt *rtt = task->tk_client->cl_rtt;
 747        unsigned timer = task->tk_msg.rpc_proc->p_timer;
 748
 749        if (timer) {
 750                if (req->rq_ntrans == 1)
 751                        rpc_update_rtt(rtt, timer,
 752                                        (long)jiffies - req->rq_xtime);
 753                rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
 754        }
 755}
 756EXPORT_SYMBOL_GPL(xprt_update_rtt);
 757
 758/**
 759 * xprt_complete_rqst - called when reply processing is complete
 760 * @task: RPC request that recently completed
 761 * @copied: actual number of bytes received from the transport
 762 *
 763 * Caller holds transport lock.
 764 */
 765void xprt_complete_rqst(struct rpc_task *task, int copied)
 766{
 767        struct rpc_rqst *req = task->tk_rqstp;
 768        struct rpc_xprt *xprt = req->rq_xprt;
 769
 770        dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
 771                        task->tk_pid, ntohl(req->rq_xid), copied);
 772
 773        xprt->stat.recvs++;
 774        task->tk_rtt = (long)jiffies - req->rq_xtime;
 775
 776        list_del_init(&req->rq_list);
 777        req->rq_private_buf.len = copied;
 778        /* Ensure all writes are done before we update req->rq_received */
 779        smp_wmb();
 780        req->rq_received = copied;
 781        rpc_wake_up_queued_task(&xprt->pending, task);
 782}
 783EXPORT_SYMBOL_GPL(xprt_complete_rqst);
 784
 785static void xprt_timer(struct rpc_task *task)
 786{
 787        struct rpc_rqst *req = task->tk_rqstp;
 788        struct rpc_xprt *xprt = req->rq_xprt;
 789
 790        if (task->tk_status != -ETIMEDOUT)
 791                return;
 792        dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
 793
 794        spin_lock_bh(&xprt->transport_lock);
 795        if (!req->rq_received) {
 796                if (xprt->ops->timer)
 797                        xprt->ops->timer(task);
 798        } else
 799                task->tk_status = 0;
 800        spin_unlock_bh(&xprt->transport_lock);
 801}
 802
 803/**
 804 * xprt_prepare_transmit - reserve the transport before sending a request
 805 * @task: RPC task about to send a request
 806 *
 807 */
 808int xprt_prepare_transmit(struct rpc_task *task)
 809{
 810        struct rpc_rqst *req = task->tk_rqstp;
 811        struct rpc_xprt *xprt = req->rq_xprt;
 812        int err = 0;
 813
 814        dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
 815
 816        spin_lock_bh(&xprt->transport_lock);
 817        if (req->rq_received && !req->rq_bytes_sent) {
 818                err = req->rq_received;
 819                goto out_unlock;
 820        }
 821        if (!xprt->ops->reserve_xprt(task)) {
 822                err = -EAGAIN;
 823                goto out_unlock;
 824        }
 825
 826        if (!xprt_connected(xprt)) {
 827                err = -ENOTCONN;
 828                goto out_unlock;
 829        }
 830out_unlock:
 831        spin_unlock_bh(&xprt->transport_lock);
 832        return err;
 833}
 834
 835void xprt_end_transmit(struct rpc_task *task)
 836{
 837        xprt_release_write(task->tk_xprt, task);
 838}
 839
 840/**
 841 * xprt_transmit - send an RPC request on a transport
 842 * @task: controlling RPC task
 843 *
 844 * We have to copy the iovec because sendmsg fiddles with its contents.
 845 */
 846void xprt_transmit(struct rpc_task *task)
 847{
 848        struct rpc_rqst *req = task->tk_rqstp;
 849        struct rpc_xprt *xprt = req->rq_xprt;
 850        int status;
 851
 852        dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 853
 854        if (!req->rq_received) {
 855                if (list_empty(&req->rq_list)) {
 856                        spin_lock_bh(&xprt->transport_lock);
 857                        /* Update the softirq receive buffer */
 858                        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 859                                        sizeof(req->rq_private_buf));
 860                        /* Add request to the receive list */
 861                        list_add_tail(&req->rq_list, &xprt->recv);
 862                        spin_unlock_bh(&xprt->transport_lock);
 863                        xprt_reset_majortimeo(req);
 864                        /* Turn off autodisconnect */
 865                        del_singleshot_timer_sync(&xprt->timer);
 866                }
 867        } else if (!req->rq_bytes_sent)
 868                return;
 869
 870        req->rq_connect_cookie = xprt->connect_cookie;
 871        req->rq_xtime = jiffies;
 872        status = xprt->ops->send_request(task);
 873        if (status == 0) {
 874                dprintk("RPC: %5u xmit complete\n", task->tk_pid);
 875                spin_lock_bh(&xprt->transport_lock);
 876
 877                xprt->ops->set_retrans_timeout(task);
 878
 879                xprt->stat.sends++;
 880                xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
 881                xprt->stat.bklog_u += xprt->backlog.qlen;
 882
 883                /* Don't race with disconnect */
 884                if (!xprt_connected(xprt))
 885                        task->tk_status = -ENOTCONN;
 886                else if (!req->rq_received)
 887                        rpc_sleep_on(&xprt->pending, task, xprt_timer);
 888                spin_unlock_bh(&xprt->transport_lock);
 889                return;
 890        }
 891
 892        /* Note: at this point, task->tk_sleeping has not yet been set,
 893         *       hence there is no danger of the waking up task being put on
 894         *       schedq, and being picked up by a parallel run of rpciod().
 895         */
 896        task->tk_status = status;
 897        if (status == -ECONNREFUSED)
 898                rpc_sleep_on(&xprt->sending, task, NULL);
 899}
 900
 901static inline void do_xprt_reserve(struct rpc_task *task)
 902{
 903        struct rpc_xprt *xprt = task->tk_xprt;
 904
 905        task->tk_status = 0;
 906        if (task->tk_rqstp)
 907                return;
 908        if (!list_empty(&xprt->free)) {
 909                struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
 910                list_del_init(&req->rq_list);
 911                task->tk_rqstp = req;
 912                xprt_request_init(task, xprt);
 913                return;
 914        }
 915        dprintk("RPC:       waiting for request slot\n");
 916        task->tk_status = -EAGAIN;
 917        task->tk_timeout = 0;
 918        rpc_sleep_on(&xprt->backlog, task, NULL);
 919}
 920
 921/**
 922 * xprt_reserve - allocate an RPC request slot
 923 * @task: RPC task requesting a slot allocation
 924 *
 925 * If no more slots are available, place the task on the transport's
 926 * backlog queue.
 927 */
 928void xprt_reserve(struct rpc_task *task)
 929{
 930        struct rpc_xprt *xprt = task->tk_xprt;
 931
 932        task->tk_status = -EIO;
 933        spin_lock(&xprt->reserve_lock);
 934        do_xprt_reserve(task);
 935        spin_unlock(&xprt->reserve_lock);
 936}
 937
 938static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
 939{
 940        return xprt->xid++;
 941}
 942
 943static inline void xprt_init_xid(struct rpc_xprt *xprt)
 944{
 945        xprt->xid = net_random();
 946}
 947
 948static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 949{
 950        struct rpc_rqst *req = task->tk_rqstp;
 951
 952        req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 953        req->rq_task    = task;
 954        req->rq_xprt    = xprt;
 955        req->rq_buffer  = NULL;
 956        req->rq_xid     = xprt_alloc_xid(xprt);
 957        req->rq_release_snd_buf = NULL;
 958        xprt_reset_majortimeo(req);
 959        dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
 960                        req, ntohl(req->rq_xid));
 961}
 962
 963/**
 964 * xprt_release - release an RPC request slot
 965 * @task: task which is finished with the slot
 966 *
 967 */
 968void xprt_release(struct rpc_task *task)
 969{
 970        struct rpc_xprt *xprt = task->tk_xprt;
 971        struct rpc_rqst *req;
 972
 973        if (!(req = task->tk_rqstp))
 974                return;
 975        rpc_count_iostats(task);
 976        spin_lock_bh(&xprt->transport_lock);
 977        xprt->ops->release_xprt(xprt, task);
 978        if (xprt->ops->release_request)
 979                xprt->ops->release_request(task);
 980        if (!list_empty(&req->rq_list))
 981                list_del(&req->rq_list);
 982        xprt->last_used = jiffies;
 983        if (list_empty(&xprt->recv))
 984                mod_timer(&xprt->timer,
 985                                xprt->last_used + xprt->idle_timeout);
 986        spin_unlock_bh(&xprt->transport_lock);
 987        xprt->ops->buf_free(req->rq_buffer);
 988        task->tk_rqstp = NULL;
 989        if (req->rq_release_snd_buf)
 990                req->rq_release_snd_buf(req);
 991        memset(req, 0, sizeof(*req));   /* mark unused */
 992
 993        dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
 994
 995        spin_lock(&xprt->reserve_lock);
 996        list_add(&req->rq_list, &xprt->free);
 997        rpc_wake_up_next(&xprt->backlog);
 998        spin_unlock(&xprt->reserve_lock);
 999}
1000
1001/**
1002 * xprt_create_transport - create an RPC transport
1003 * @args: rpc transport creation arguments
1004 *
1005 */
1006struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1007{
1008        struct rpc_xprt *xprt;
1009        struct rpc_rqst *req;
1010        struct xprt_class *t;
1011
1012        spin_lock(&xprt_list_lock);
1013        list_for_each_entry(t, &xprt_list, list) {
1014                if (t->ident == args->ident) {
1015                        spin_unlock(&xprt_list_lock);
1016                        goto found;
1017                }
1018        }
1019        spin_unlock(&xprt_list_lock);
1020        printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
1021        return ERR_PTR(-EIO);
1022
1023found:
1024        xprt = t->setup(args);
1025        if (IS_ERR(xprt)) {
1026                dprintk("RPC:       xprt_create_transport: failed, %ld\n",
1027                                -PTR_ERR(xprt));
1028                return xprt;
1029        }
1030
1031        kref_init(&xprt->kref);
1032        spin_lock_init(&xprt->transport_lock);
1033        spin_lock_init(&xprt->reserve_lock);
1034
1035        INIT_LIST_HEAD(&xprt->free);
1036        INIT_LIST_HEAD(&xprt->recv);
1037        INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1038        setup_timer(&xprt->timer, xprt_init_autodisconnect,
1039                        (unsigned long)xprt);
1040        xprt->last_used = jiffies;
1041        xprt->cwnd = RPC_INITCWND;
1042        xprt->bind_index = 0;
1043
1044        rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1045        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1046        rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1047        rpc_init_wait_queue(&xprt->resend, "xprt_resend");
1048        rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1049
1050        /* initialize free list */
1051        for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
1052                list_add(&req->rq_list, &xprt->free);
1053
1054        xprt_init_xid(xprt);
1055
1056        dprintk("RPC:       created transport %p with %u slots\n", xprt,
1057                        xprt->max_reqs);
1058
1059        return xprt;
1060}
1061
1062/**
1063 * xprt_destroy - destroy an RPC transport, killing off all requests.
1064 * @kref: kref for the transport to destroy
1065 *
1066 */
1067static void xprt_destroy(struct kref *kref)
1068{
1069        struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
1070
1071        dprintk("RPC:       destroying transport %p\n", xprt);
1072        xprt->shutdown = 1;
1073        del_timer_sync(&xprt->timer);
1074
1075        rpc_destroy_wait_queue(&xprt->binding);
1076        rpc_destroy_wait_queue(&xprt->pending);
1077        rpc_destroy_wait_queue(&xprt->sending);
1078        rpc_destroy_wait_queue(&xprt->resend);
1079        rpc_destroy_wait_queue(&xprt->backlog);
1080        /*
1081         * Tear down transport state and free the rpc_xprt
1082         */
1083        xprt->ops->destroy(xprt);
1084}
1085
1086/**
1087 * xprt_put - release a reference to an RPC transport.
1088 * @xprt: pointer to the transport
1089 *
1090 */
1091void xprt_put(struct rpc_xprt *xprt)
1092{
1093        kref_put(&xprt->kref, xprt_destroy);
1094}
1095
1096/**
1097 * xprt_get - return a reference to an RPC transport.
1098 * @xprt: pointer to the transport
1099 *
1100 */
1101struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1102{
1103        kref_get(&xprt->kref);
1104        return xprt;
1105}
1106