linux/net/sunrpc/xprt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/xprt.c
   3 *
   4 *  This is a generic RPC call interface supporting congestion avoidance,
   5 *  and asynchronous calls.
   6 *
   7 *  The interface works like this:
   8 *
   9 *  -   When a process places a call, it allocates a request slot if
  10 *      one is available. Otherwise, it sleeps on the backlog queue
  11 *      (xprt_reserve).
  12 *  -   Next, the caller puts together the RPC message, stuffs it into
  13 *      the request struct, and calls xprt_transmit().
  14 *  -   xprt_transmit sends the message and installs the caller on the
  15 *      transport's wait list. At the same time, it installs a timer that
  16 *      is run after the packet's timeout has expired.
  17 *  -   When a packet arrives, the data_ready handler walks the list of
  18 *      pending requests for that transport. If a matching XID is found, the
  19 *      caller is woken up, and the timer removed.
  20 *  -   When no reply arrives within the timeout interval, the timer is
  21 *      fired by the kernel and runs xprt_timer(). It either adjusts the
  22 *      timeout values (minor timeout) or wakes up the caller with a status
  23 *      of -ETIMEDOUT.
  24 *  -   When the caller receives a notification from RPC that a reply arrived,
  25 *      it should release the RPC slot, and process the reply.
  26 *      If the call timed out, it may choose to retry the operation by
  27 *      adjusting the initial timeout value, and simply calling rpc_call
  28 *      again.
  29 *
  30 *  Support for async RPC is done through a set of RPC-specific scheduling
  31 *  primitives that `transparently' work for processes as well as async
  32 *  tasks that rely on callbacks.
  33 *
  34 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  35 *
  36 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
  37 */
  38
  39#include <linux/module.h>
  40
  41#include <linux/types.h>
  42#include <linux/interrupt.h>
  43#include <linux/workqueue.h>
  44#include <linux/net.h>
  45
  46#include <linux/sunrpc/clnt.h>
  47#include <linux/sunrpc/metrics.h>
  48
  49/*
  50 * Local variables
  51 */
  52
  53#ifdef RPC_DEBUG
  54# define RPCDBG_FACILITY        RPCDBG_XPRT
  55#endif
  56
  57/*
  58 * Local functions
  59 */
  60static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
  61static inline void      do_xprt_reserve(struct rpc_task *);
  62static void     xprt_connect_status(struct rpc_task *task);
  63static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
  64
  65static DEFINE_SPINLOCK(xprt_list_lock);
  66static LIST_HEAD(xprt_list);
  67
  68/*
  69 * The transport code maintains an estimate on the maximum number of out-
  70 * standing RPC requests, using a smoothed version of the congestion
  71 * avoidance implemented in 44BSD. This is basically the Van Jacobson
  72 * congestion algorithm: If a retransmit occurs, the congestion window is
  73 * halved; otherwise, it is incremented by 1/cwnd when
  74 *
  75 *      -       a reply is received and
  76 *      -       a full number of requests are outstanding and
  77 *      -       the congestion window hasn't been updated recently.
  78 */
  79#define RPC_CWNDSHIFT           (8U)
  80#define RPC_CWNDSCALE           (1U << RPC_CWNDSHIFT)
  81#define RPC_INITCWND            RPC_CWNDSCALE
  82#define RPC_MAXCWND(xprt)       ((xprt)->max_reqs << RPC_CWNDSHIFT)
  83
  84#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
  85
  86/**
  87 * xprt_register_transport - register a transport implementation
  88 * @transport: transport to register
  89 *
  90 * If a transport implementation is loaded as a kernel module, it can
  91 * call this interface to make itself known to the RPC client.
  92 *
  93 * Returns:
  94 * 0:           transport successfully registered
  95 * -EEXIST:     transport already registered
  96 * -EINVAL:     transport module being unloaded
  97 */
  98int xprt_register_transport(struct xprt_class *transport)
  99{
 100        struct xprt_class *t;
 101        int result;
 102
 103        result = -EEXIST;
 104        spin_lock(&xprt_list_lock);
 105        list_for_each_entry(t, &xprt_list, list) {
 106                /* don't register the same transport class twice */
 107                if (t->ident == transport->ident)
 108                        goto out;
 109        }
 110
 111        list_add_tail(&transport->list, &xprt_list);
 112        printk(KERN_INFO "RPC: Registered %s transport module.\n",
 113               transport->name);
 114        result = 0;
 115
 116out:
 117        spin_unlock(&xprt_list_lock);
 118        return result;
 119}
 120EXPORT_SYMBOL_GPL(xprt_register_transport);
 121
 122/**
 123 * xprt_unregister_transport - unregister a transport implementation
 124 * @transport: transport to unregister
 125 *
 126 * Returns:
 127 * 0:           transport successfully unregistered
 128 * -ENOENT:     transport never registered
 129 */
 130int xprt_unregister_transport(struct xprt_class *transport)
 131{
 132        struct xprt_class *t;
 133        int result;
 134
 135        result = 0;
 136        spin_lock(&xprt_list_lock);
 137        list_for_each_entry(t, &xprt_list, list) {
 138                if (t == transport) {
 139                        printk(KERN_INFO
 140                                "RPC: Unregistered %s transport module.\n",
 141                                transport->name);
 142                        list_del_init(&transport->list);
 143                        goto out;
 144                }
 145        }
 146        result = -ENOENT;
 147
 148out:
 149        spin_unlock(&xprt_list_lock);
 150        return result;
 151}
 152EXPORT_SYMBOL_GPL(xprt_unregister_transport);
 153
 154/**
 155 * xprt_load_transport - load a transport implementation
 156 * @transport_name: transport to load
 157 *
 158 * Returns:
 159 * 0:           transport successfully loaded
 160 * -ENOENT:     transport module not available
 161 */
 162int xprt_load_transport(const char *transport_name)
 163{
 164        struct xprt_class *t;
 165        char module_name[sizeof t->name + 5];
 166        int result;
 167
 168        result = 0;
 169        spin_lock(&xprt_list_lock);
 170        list_for_each_entry(t, &xprt_list, list) {
 171                if (strcmp(t->name, transport_name) == 0) {
 172                        spin_unlock(&xprt_list_lock);
 173                        goto out;
 174                }
 175        }
 176        spin_unlock(&xprt_list_lock);
 177        strcpy(module_name, "xprt");
 178        strncat(module_name, transport_name, sizeof t->name);
 179        result = request_module(module_name);
 180out:
 181        return result;
 182}
 183EXPORT_SYMBOL_GPL(xprt_load_transport);
 184
 185/**
 186 * xprt_reserve_xprt - serialize write access to transports
 187 * @task: task that is requesting access to the transport
 188 *
 189 * This prevents mixing the payload of separate requests, and prevents
 190 * transport connects from colliding with writes.  No congestion control
 191 * is provided.
 192 */
 193int xprt_reserve_xprt(struct rpc_task *task)
 194{
 195        struct rpc_xprt *xprt = task->tk_xprt;
 196        struct rpc_rqst *req = task->tk_rqstp;
 197
 198        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 199                if (task == xprt->snd_task)
 200                        return 1;
 201                if (task == NULL)
 202                        return 0;
 203                goto out_sleep;
 204        }
 205        xprt->snd_task = task;
 206        if (req) {
 207                req->rq_bytes_sent = 0;
 208                req->rq_ntrans++;
 209        }
 210        return 1;
 211
 212out_sleep:
 213        dprintk("RPC: %5u failed to lock transport %p\n",
 214                        task->tk_pid, xprt);
 215        task->tk_timeout = 0;
 216        task->tk_status = -EAGAIN;
 217        if (req && req->rq_ntrans)
 218                rpc_sleep_on(&xprt->resend, task, NULL);
 219        else
 220                rpc_sleep_on(&xprt->sending, task, NULL);
 221        return 0;
 222}
 223EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
 224
 225static void xprt_clear_locked(struct rpc_xprt *xprt)
 226{
 227        xprt->snd_task = NULL;
 228        if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
 229                smp_mb__before_clear_bit();
 230                clear_bit(XPRT_LOCKED, &xprt->state);
 231                smp_mb__after_clear_bit();
 232        } else
 233                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 234}
 235
 236/*
 237 * xprt_reserve_xprt_cong - serialize write access to transports
 238 * @task: task that is requesting access to the transport
 239 *
 240 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
 241 * integrated into the decision of whether a request is allowed to be
 242 * woken up and given access to the transport.
 243 */
 244int xprt_reserve_xprt_cong(struct rpc_task *task)
 245{
 246        struct rpc_xprt *xprt = task->tk_xprt;
 247        struct rpc_rqst *req = task->tk_rqstp;
 248
 249        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 250                if (task == xprt->snd_task)
 251                        return 1;
 252                goto out_sleep;
 253        }
 254        if (__xprt_get_cong(xprt, task)) {
 255                xprt->snd_task = task;
 256                if (req) {
 257                        req->rq_bytes_sent = 0;
 258                        req->rq_ntrans++;
 259                }
 260                return 1;
 261        }
 262        xprt_clear_locked(xprt);
 263out_sleep:
 264        dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
 265        task->tk_timeout = 0;
 266        task->tk_status = -EAGAIN;
 267        if (req && req->rq_ntrans)
 268                rpc_sleep_on(&xprt->resend, task, NULL);
 269        else
 270                rpc_sleep_on(&xprt->sending, task, NULL);
 271        return 0;
 272}
 273EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
 274
 275static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 276{
 277        int retval;
 278
 279        spin_lock_bh(&xprt->transport_lock);
 280        retval = xprt->ops->reserve_xprt(task);
 281        spin_unlock_bh(&xprt->transport_lock);
 282        return retval;
 283}
 284
 285static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 286{
 287        struct rpc_task *task;
 288        struct rpc_rqst *req;
 289
 290        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 291                return;
 292
 293        task = rpc_wake_up_next(&xprt->resend);
 294        if (!task) {
 295                task = rpc_wake_up_next(&xprt->sending);
 296                if (!task)
 297                        goto out_unlock;
 298        }
 299
 300        req = task->tk_rqstp;
 301        xprt->snd_task = task;
 302        if (req) {
 303                req->rq_bytes_sent = 0;
 304                req->rq_ntrans++;
 305        }
 306        return;
 307
 308out_unlock:
 309        xprt_clear_locked(xprt);
 310}
 311
 312static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 313{
 314        struct rpc_task *task;
 315
 316        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 317                return;
 318        if (RPCXPRT_CONGESTED(xprt))
 319                goto out_unlock;
 320        task = rpc_wake_up_next(&xprt->resend);
 321        if (!task) {
 322                task = rpc_wake_up_next(&xprt->sending);
 323                if (!task)
 324                        goto out_unlock;
 325        }
 326        if (__xprt_get_cong(xprt, task)) {
 327                struct rpc_rqst *req = task->tk_rqstp;
 328                xprt->snd_task = task;
 329                if (req) {
 330                        req->rq_bytes_sent = 0;
 331                        req->rq_ntrans++;
 332                }
 333                return;
 334        }
 335out_unlock:
 336        xprt_clear_locked(xprt);
 337}
 338
 339/**
 340 * xprt_release_xprt - allow other requests to use a transport
 341 * @xprt: transport with other tasks potentially waiting
 342 * @task: task that is releasing access to the transport
 343 *
 344 * Note that "task" can be NULL.  No congestion control is provided.
 345 */
 346void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 347{
 348        if (xprt->snd_task == task) {
 349                xprt_clear_locked(xprt);
 350                __xprt_lock_write_next(xprt);
 351        }
 352}
 353EXPORT_SYMBOL_GPL(xprt_release_xprt);
 354
 355/**
 356 * xprt_release_xprt_cong - allow other requests to use a transport
 357 * @xprt: transport with other tasks potentially waiting
 358 * @task: task that is releasing access to the transport
 359 *
 360 * Note that "task" can be NULL.  Another task is awoken to use the
 361 * transport if the transport's congestion window allows it.
 362 */
 363void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 364{
 365        if (xprt->snd_task == task) {
 366                xprt_clear_locked(xprt);
 367                __xprt_lock_write_next_cong(xprt);
 368        }
 369}
 370EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
 371
 372static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 373{
 374        spin_lock_bh(&xprt->transport_lock);
 375        xprt->ops->release_xprt(xprt, task);
 376        spin_unlock_bh(&xprt->transport_lock);
 377}
 378
 379/*
 380 * Van Jacobson congestion avoidance. Check if the congestion window
 381 * overflowed. Put the task to sleep if this is the case.
 382 */
 383static int
 384__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 385{
 386        struct rpc_rqst *req = task->tk_rqstp;
 387
 388        if (req->rq_cong)
 389                return 1;
 390        dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
 391                        task->tk_pid, xprt->cong, xprt->cwnd);
 392        if (RPCXPRT_CONGESTED(xprt))
 393                return 0;
 394        req->rq_cong = 1;
 395        xprt->cong += RPC_CWNDSCALE;
 396        return 1;
 397}
 398
 399/*
 400 * Adjust the congestion window, and wake up the next task
 401 * that has been sleeping due to congestion
 402 */
 403static void
 404__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 405{
 406        if (!req->rq_cong)
 407                return;
 408        req->rq_cong = 0;
 409        xprt->cong -= RPC_CWNDSCALE;
 410        __xprt_lock_write_next_cong(xprt);
 411}
 412
 413/**
 414 * xprt_release_rqst_cong - housekeeping when request is complete
 415 * @task: RPC request that recently completed
 416 *
 417 * Useful for transports that require congestion control.
 418 */
 419void xprt_release_rqst_cong(struct rpc_task *task)
 420{
 421        __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
 422}
 423EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
 424
 425/**
 426 * xprt_adjust_cwnd - adjust transport congestion window
 427 * @task: recently completed RPC request used to adjust window
 428 * @result: result code of completed RPC request
 429 *
 430 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
 431 */
 432void xprt_adjust_cwnd(struct rpc_task *task, int result)
 433{
 434        struct rpc_rqst *req = task->tk_rqstp;
 435        struct rpc_xprt *xprt = task->tk_xprt;
 436        unsigned long cwnd = xprt->cwnd;
 437
 438        if (result >= 0 && cwnd <= xprt->cong) {
 439                /* The (cwnd >> 1) term makes sure
 440                 * the result gets rounded properly. */
 441                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
 442                if (cwnd > RPC_MAXCWND(xprt))
 443                        cwnd = RPC_MAXCWND(xprt);
 444                __xprt_lock_write_next_cong(xprt);
 445        } else if (result == -ETIMEDOUT) {
 446                cwnd >>= 1;
 447                if (cwnd < RPC_CWNDSCALE)
 448                        cwnd = RPC_CWNDSCALE;
 449        }
 450        dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
 451                        xprt->cong, xprt->cwnd, cwnd);
 452        xprt->cwnd = cwnd;
 453        __xprt_put_cong(xprt, req);
 454}
 455EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
 456
 457/**
 458 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
 459 * @xprt: transport with waiting tasks
 460 * @status: result code to plant in each task before waking it
 461 *
 462 */
 463void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
 464{
 465        if (status < 0)
 466                rpc_wake_up_status(&xprt->pending, status);
 467        else
 468                rpc_wake_up(&xprt->pending);
 469}
 470EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
 471
 472/**
 473 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
 474 * @task: task to be put to sleep
 475 * @action: function pointer to be executed after wait
 476 */
 477void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
 478{
 479        struct rpc_rqst *req = task->tk_rqstp;
 480        struct rpc_xprt *xprt = req->rq_xprt;
 481
 482        task->tk_timeout = req->rq_timeout;
 483        rpc_sleep_on(&xprt->pending, task, action);
 484}
 485EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 486
 487/**
 488 * xprt_write_space - wake the task waiting for transport output buffer space
 489 * @xprt: transport with waiting tasks
 490 *
 491 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
 492 */
 493void xprt_write_space(struct rpc_xprt *xprt)
 494{
 495        if (unlikely(xprt->shutdown))
 496                return;
 497
 498        spin_lock_bh(&xprt->transport_lock);
 499        if (xprt->snd_task) {
 500                dprintk("RPC:       write space: waking waiting task on "
 501                                "xprt %p\n", xprt);
 502                rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
 503        }
 504        spin_unlock_bh(&xprt->transport_lock);
 505}
 506EXPORT_SYMBOL_GPL(xprt_write_space);
 507
 508/**
 509 * xprt_set_retrans_timeout_def - set a request's retransmit timeout
 510 * @task: task whose timeout is to be set
 511 *
 512 * Set a request's retransmit timeout based on the transport's
 513 * default timeout parameters.  Used by transports that don't adjust
 514 * the retransmit timeout based on round-trip time estimation.
 515 */
 516void xprt_set_retrans_timeout_def(struct rpc_task *task)
 517{
 518        task->tk_timeout = task->tk_rqstp->rq_timeout;
 519}
 520EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
 521
 522/*
 523 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
 524 * @task: task whose timeout is to be set
 525 *
 526 * Set a request's retransmit timeout using the RTT estimator.
 527 */
 528void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
 529{
 530        int timer = task->tk_msg.rpc_proc->p_timer;
 531        struct rpc_clnt *clnt = task->tk_client;
 532        struct rpc_rtt *rtt = clnt->cl_rtt;
 533        struct rpc_rqst *req = task->tk_rqstp;
 534        unsigned long max_timeout = clnt->cl_timeout->to_maxval;
 535
 536        task->tk_timeout = rpc_calc_rto(rtt, timer);
 537        task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
 538        if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
 539                task->tk_timeout = max_timeout;
 540}
 541EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
 542
 543static void xprt_reset_majortimeo(struct rpc_rqst *req)
 544{
 545        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 546
 547        req->rq_majortimeo = req->rq_timeout;
 548        if (to->to_exponential)
 549                req->rq_majortimeo <<= to->to_retries;
 550        else
 551                req->rq_majortimeo += to->to_increment * to->to_retries;
 552        if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
 553                req->rq_majortimeo = to->to_maxval;
 554        req->rq_majortimeo += jiffies;
 555}
 556
 557/**
 558 * xprt_adjust_timeout - adjust timeout values for next retransmit
 559 * @req: RPC request containing parameters to use for the adjustment
 560 *
 561 */
 562int xprt_adjust_timeout(struct rpc_rqst *req)
 563{
 564        struct rpc_xprt *xprt = req->rq_xprt;
 565        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 566        int status = 0;
 567
 568        if (time_before(jiffies, req->rq_majortimeo)) {
 569                if (to->to_exponential)
 570                        req->rq_timeout <<= 1;
 571                else
 572                        req->rq_timeout += to->to_increment;
 573                if (to->to_maxval && req->rq_timeout >= to->to_maxval)
 574                        req->rq_timeout = to->to_maxval;
 575                req->rq_retries++;
 576        } else {
 577                req->rq_timeout = to->to_initval;
 578                req->rq_retries = 0;
 579                xprt_reset_majortimeo(req);
 580                /* Reset the RTT counters == "slow start" */
 581                spin_lock_bh(&xprt->transport_lock);
 582                rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
 583                spin_unlock_bh(&xprt->transport_lock);
 584                status = -ETIMEDOUT;
 585        }
 586
 587        if (req->rq_timeout == 0) {
 588                printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
 589                req->rq_timeout = 5 * HZ;
 590        }
 591        return status;
 592}
 593
 594static void xprt_autoclose(struct work_struct *work)
 595{
 596        struct rpc_xprt *xprt =
 597                container_of(work, struct rpc_xprt, task_cleanup);
 598
 599        xprt->ops->close(xprt);
 600        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 601        xprt_release_write(xprt, NULL);
 602}
 603
 604/**
 605 * xprt_disconnect_done - mark a transport as disconnected
 606 * @xprt: transport to flag for disconnect
 607 *
 608 */
 609void xprt_disconnect_done(struct rpc_xprt *xprt)
 610{
 611        dprintk("RPC:       disconnected transport %p\n", xprt);
 612        spin_lock_bh(&xprt->transport_lock);
 613        xprt_clear_connected(xprt);
 614        xprt_wake_pending_tasks(xprt, -EAGAIN);
 615        spin_unlock_bh(&xprt->transport_lock);
 616}
 617EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 618
 619/**
 620 * xprt_force_disconnect - force a transport to disconnect
 621 * @xprt: transport to disconnect
 622 *
 623 */
 624void xprt_force_disconnect(struct rpc_xprt *xprt)
 625{
 626        /* Don't race with the test_bit() in xprt_clear_locked() */
 627        spin_lock_bh(&xprt->transport_lock);
 628        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 629        /* Try to schedule an autoclose RPC call */
 630        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 631                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 632        xprt_wake_pending_tasks(xprt, -EAGAIN);
 633        spin_unlock_bh(&xprt->transport_lock);
 634}
 635
 636/**
 637 * xprt_conditional_disconnect - force a transport to disconnect
 638 * @xprt: transport to disconnect
 639 * @cookie: 'connection cookie'
 640 *
 641 * This attempts to break the connection if and only if 'cookie' matches
 642 * the current transport 'connection cookie'. It ensures that we don't
 643 * try to break the connection more than once when we need to retransmit
 644 * a batch of RPC requests.
 645 *
 646 */
 647void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 648{
 649        /* Don't race with the test_bit() in xprt_clear_locked() */
 650        spin_lock_bh(&xprt->transport_lock);
 651        if (cookie != xprt->connect_cookie)
 652                goto out;
 653        if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
 654                goto out;
 655        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 656        /* Try to schedule an autoclose RPC call */
 657        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 658                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 659        xprt_wake_pending_tasks(xprt, -EAGAIN);
 660out:
 661        spin_unlock_bh(&xprt->transport_lock);
 662}
 663
 664static void
 665xprt_init_autodisconnect(unsigned long data)
 666{
 667        struct rpc_xprt *xprt = (struct rpc_xprt *)data;
 668
 669        spin_lock(&xprt->transport_lock);
 670        if (!list_empty(&xprt->recv) || xprt->shutdown)
 671                goto out_abort;
 672        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 673                goto out_abort;
 674        spin_unlock(&xprt->transport_lock);
 675        set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
 676        queue_work(rpciod_workqueue, &xprt->task_cleanup);
 677        return;
 678out_abort:
 679        spin_unlock(&xprt->transport_lock);
 680}
 681
 682/**
 683 * xprt_connect - schedule a transport connect operation
 684 * @task: RPC task that is requesting the connect
 685 *
 686 */
 687void xprt_connect(struct rpc_task *task)
 688{
 689        struct rpc_xprt *xprt = task->tk_xprt;
 690
 691        dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
 692                        xprt, (xprt_connected(xprt) ? "is" : "is not"));
 693
 694        if (!xprt_bound(xprt)) {
 695                task->tk_status = -EAGAIN;
 696                return;
 697        }
 698        if (!xprt_lock_write(xprt, task))
 699                return;
 700        if (xprt_connected(xprt))
 701                xprt_release_write(xprt, task);
 702        else {
 703                if (task->tk_rqstp)
 704                        task->tk_rqstp->rq_bytes_sent = 0;
 705
 706                task->tk_timeout = xprt->connect_timeout;
 707                rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
 708                xprt->stat.connect_start = jiffies;
 709                xprt->ops->connect(task);
 710        }
 711        return;
 712}
 713
 714static void xprt_connect_status(struct rpc_task *task)
 715{
 716        struct rpc_xprt *xprt = task->tk_xprt;
 717
 718        if (task->tk_status == 0) {
 719                xprt->stat.connect_count++;
 720                xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
 721                dprintk("RPC: %5u xprt_connect_status: connection established\n",
 722                                task->tk_pid);
 723                return;
 724        }
 725
 726        switch (task->tk_status) {
 727        case -EAGAIN:
 728                dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
 729                break;
 730        case -ETIMEDOUT:
 731                dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
 732                                "out\n", task->tk_pid);
 733                break;
 734        default:
 735                dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
 736                                "server %s\n", task->tk_pid, -task->tk_status,
 737                                task->tk_client->cl_server);
 738                xprt_release_write(xprt, task);
 739                task->tk_status = -EIO;
 740        }
 741}
 742
 743/**
 744 * xprt_lookup_rqst - find an RPC request corresponding to an XID
 745 * @xprt: transport on which the original request was transmitted
 746 * @xid: RPC XID of incoming reply
 747 *
 748 */
 749struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 750{
 751        struct list_head *pos;
 752
 753        list_for_each(pos, &xprt->recv) {
 754                struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
 755                if (entry->rq_xid == xid)
 756                        return entry;
 757        }
 758
 759        dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
 760                        ntohl(xid));
 761        xprt->stat.bad_xids++;
 762        return NULL;
 763}
 764EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 765
 766/**
 767 * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
 768 * @task: RPC request that recently completed
 769 *
 770 */
 771void xprt_update_rtt(struct rpc_task *task)
 772{
 773        struct rpc_rqst *req = task->tk_rqstp;
 774        struct rpc_rtt *rtt = task->tk_client->cl_rtt;
 775        unsigned timer = task->tk_msg.rpc_proc->p_timer;
 776
 777        if (timer) {
 778                if (req->rq_ntrans == 1)
 779                        rpc_update_rtt(rtt, timer,
 780                                        (long)jiffies - req->rq_xtime);
 781                rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
 782        }
 783}
 784EXPORT_SYMBOL_GPL(xprt_update_rtt);
 785
 786/**
 787 * xprt_complete_rqst - called when reply processing is complete
 788 * @task: RPC request that recently completed
 789 * @copied: actual number of bytes received from the transport
 790 *
 791 * Caller holds transport lock.
 792 */
 793void xprt_complete_rqst(struct rpc_task *task, int copied)
 794{
 795        struct rpc_rqst *req = task->tk_rqstp;
 796        struct rpc_xprt *xprt = req->rq_xprt;
 797
 798        dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
 799                        task->tk_pid, ntohl(req->rq_xid), copied);
 800
 801        xprt->stat.recvs++;
 802        task->tk_rtt = (long)jiffies - req->rq_xtime;
 803
 804        list_del_init(&req->rq_list);
 805        req->rq_private_buf.len = copied;
 806        /* Ensure all writes are done before we update req->rq_received */
 807        smp_wmb();
 808        req->rq_received = copied;
 809        rpc_wake_up_queued_task(&xprt->pending, task);
 810}
 811EXPORT_SYMBOL_GPL(xprt_complete_rqst);
 812
 813static void xprt_timer(struct rpc_task *task)
 814{
 815        struct rpc_rqst *req = task->tk_rqstp;
 816        struct rpc_xprt *xprt = req->rq_xprt;
 817
 818        if (task->tk_status != -ETIMEDOUT)
 819                return;
 820        dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
 821
 822        spin_lock_bh(&xprt->transport_lock);
 823        if (!req->rq_received) {
 824                if (xprt->ops->timer)
 825                        xprt->ops->timer(task);
 826        } else
 827                task->tk_status = 0;
 828        spin_unlock_bh(&xprt->transport_lock);
 829}
 830
 831/**
 832 * xprt_prepare_transmit - reserve the transport before sending a request
 833 * @task: RPC task about to send a request
 834 *
 835 */
 836int xprt_prepare_transmit(struct rpc_task *task)
 837{
 838        struct rpc_rqst *req = task->tk_rqstp;
 839        struct rpc_xprt *xprt = req->rq_xprt;
 840        int err = 0;
 841
 842        dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
 843
 844        spin_lock_bh(&xprt->transport_lock);
 845        if (req->rq_received && !req->rq_bytes_sent) {
 846                err = req->rq_received;
 847                goto out_unlock;
 848        }
 849        if (!xprt->ops->reserve_xprt(task))
 850                err = -EAGAIN;
 851out_unlock:
 852        spin_unlock_bh(&xprt->transport_lock);
 853        return err;
 854}
 855
 856void xprt_end_transmit(struct rpc_task *task)
 857{
 858        xprt_release_write(task->tk_xprt, task);
 859}
 860
 861/**
 862 * xprt_transmit - send an RPC request on a transport
 863 * @task: controlling RPC task
 864 *
 865 * We have to copy the iovec because sendmsg fiddles with its contents.
 866 */
 867void xprt_transmit(struct rpc_task *task)
 868{
 869        struct rpc_rqst *req = task->tk_rqstp;
 870        struct rpc_xprt *xprt = req->rq_xprt;
 871        int status;
 872
 873        dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 874
 875        if (!req->rq_received) {
 876                if (list_empty(&req->rq_list)) {
 877                        spin_lock_bh(&xprt->transport_lock);
 878                        /* Update the softirq receive buffer */
 879                        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 880                                        sizeof(req->rq_private_buf));
 881                        /* Add request to the receive list */
 882                        list_add_tail(&req->rq_list, &xprt->recv);
 883                        spin_unlock_bh(&xprt->transport_lock);
 884                        xprt_reset_majortimeo(req);
 885                        /* Turn off autodisconnect */
 886                        del_singleshot_timer_sync(&xprt->timer);
 887                }
 888        } else if (!req->rq_bytes_sent)
 889                return;
 890
 891        req->rq_connect_cookie = xprt->connect_cookie;
 892        req->rq_xtime = jiffies;
 893        status = xprt->ops->send_request(task);
 894        if (status != 0) {
 895                task->tk_status = status;
 896                return;
 897        }
 898
 899        dprintk("RPC: %5u xmit complete\n", task->tk_pid);
 900        spin_lock_bh(&xprt->transport_lock);
 901
 902        xprt->ops->set_retrans_timeout(task);
 903
 904        xprt->stat.sends++;
 905        xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
 906        xprt->stat.bklog_u += xprt->backlog.qlen;
 907
 908        /* Don't race with disconnect */
 909        if (!xprt_connected(xprt))
 910                task->tk_status = -ENOTCONN;
 911        else if (!req->rq_received)
 912                rpc_sleep_on(&xprt->pending, task, xprt_timer);
 913        spin_unlock_bh(&xprt->transport_lock);
 914}
 915
 916static inline void do_xprt_reserve(struct rpc_task *task)
 917{
 918        struct rpc_xprt *xprt = task->tk_xprt;
 919
 920        task->tk_status = 0;
 921        if (task->tk_rqstp)
 922                return;
 923        if (!list_empty(&xprt->free)) {
 924                struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
 925                list_del_init(&req->rq_list);
 926                task->tk_rqstp = req;
 927                xprt_request_init(task, xprt);
 928                return;
 929        }
 930        dprintk("RPC:       waiting for request slot\n");
 931        task->tk_status = -EAGAIN;
 932        task->tk_timeout = 0;
 933        rpc_sleep_on(&xprt->backlog, task, NULL);
 934}
 935
 936/**
 937 * xprt_reserve - allocate an RPC request slot
 938 * @task: RPC task requesting a slot allocation
 939 *
 940 * If no more slots are available, place the task on the transport's
 941 * backlog queue.
 942 */
 943void xprt_reserve(struct rpc_task *task)
 944{
 945        struct rpc_xprt *xprt = task->tk_xprt;
 946
 947        task->tk_status = -EIO;
 948        spin_lock(&xprt->reserve_lock);
 949        do_xprt_reserve(task);
 950        spin_unlock(&xprt->reserve_lock);
 951}
 952
 953static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
 954{
 955        return xprt->xid++;
 956}
 957
 958static inline void xprt_init_xid(struct rpc_xprt *xprt)
 959{
 960        xprt->xid = net_random();
 961}
 962
 963static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 964{
 965        struct rpc_rqst *req = task->tk_rqstp;
 966
 967        req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 968        req->rq_task    = task;
 969        req->rq_xprt    = xprt;
 970        req->rq_buffer  = NULL;
 971        req->rq_xid     = xprt_alloc_xid(xprt);
 972        req->rq_release_snd_buf = NULL;
 973        xprt_reset_majortimeo(req);
 974        dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
 975                        req, ntohl(req->rq_xid));
 976}
 977
 978/**
 979 * xprt_release - release an RPC request slot
 980 * @task: task which is finished with the slot
 981 *
 982 */
 983void xprt_release(struct rpc_task *task)
 984{
 985        struct rpc_xprt *xprt = task->tk_xprt;
 986        struct rpc_rqst *req;
 987
 988        if (!(req = task->tk_rqstp))
 989                return;
 990        rpc_count_iostats(task);
 991        spin_lock_bh(&xprt->transport_lock);
 992        xprt->ops->release_xprt(xprt, task);
 993        if (xprt->ops->release_request)
 994                xprt->ops->release_request(task);
 995        if (!list_empty(&req->rq_list))
 996                list_del(&req->rq_list);
 997        xprt->last_used = jiffies;
 998        if (list_empty(&xprt->recv))
 999                mod_timer(&xprt->timer,
1000                                xprt->last_used + xprt->idle_timeout);
1001        spin_unlock_bh(&xprt->transport_lock);
1002        xprt->ops->buf_free(req->rq_buffer);
1003        task->tk_rqstp = NULL;
1004        if (req->rq_release_snd_buf)
1005                req->rq_release_snd_buf(req);
1006        memset(req, 0, sizeof(*req));   /* mark unused */
1007
1008        dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
1009
1010        spin_lock(&xprt->reserve_lock);
1011        list_add(&req->rq_list, &xprt->free);
1012        rpc_wake_up_next(&xprt->backlog);
1013        spin_unlock(&xprt->reserve_lock);
1014}
1015
1016/**
1017 * xprt_create_transport - create an RPC transport
1018 * @args: rpc transport creation arguments
1019 *
1020 */
1021struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1022{
1023        struct rpc_xprt *xprt;
1024        struct rpc_rqst *req;
1025        struct xprt_class *t;
1026
1027        spin_lock(&xprt_list_lock);
1028        list_for_each_entry(t, &xprt_list, list) {
1029                if (t->ident == args->ident) {
1030                        spin_unlock(&xprt_list_lock);
1031                        goto found;
1032                }
1033        }
1034        spin_unlock(&xprt_list_lock);
1035        printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
1036        return ERR_PTR(-EIO);
1037
1038found:
1039        xprt = t->setup(args);
1040        if (IS_ERR(xprt)) {
1041                dprintk("RPC:       xprt_create_transport: failed, %ld\n",
1042                                -PTR_ERR(xprt));
1043                return xprt;
1044        }
1045
1046        kref_init(&xprt->kref);
1047        spin_lock_init(&xprt->transport_lock);
1048        spin_lock_init(&xprt->reserve_lock);
1049
1050        INIT_LIST_HEAD(&xprt->free);
1051        INIT_LIST_HEAD(&xprt->recv);
1052        INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1053        setup_timer(&xprt->timer, xprt_init_autodisconnect,
1054                        (unsigned long)xprt);
1055        xprt->last_used = jiffies;
1056        xprt->cwnd = RPC_INITCWND;
1057        xprt->bind_index = 0;
1058
1059        rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1060        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1061        rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1062        rpc_init_wait_queue(&xprt->resend, "xprt_resend");
1063        rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1064
1065        /* initialize free list */
1066        for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
1067                list_add(&req->rq_list, &xprt->free);
1068
1069        xprt_init_xid(xprt);
1070
1071        dprintk("RPC:       created transport %p with %u slots\n", xprt,
1072                        xprt->max_reqs);
1073
1074        return xprt;
1075}
1076
1077/**
1078 * xprt_destroy - destroy an RPC transport, killing off all requests.
1079 * @kref: kref for the transport to destroy
1080 *
1081 */
1082static void xprt_destroy(struct kref *kref)
1083{
1084        struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
1085
1086        dprintk("RPC:       destroying transport %p\n", xprt);
1087        xprt->shutdown = 1;
1088        del_timer_sync(&xprt->timer);
1089
1090        rpc_destroy_wait_queue(&xprt->binding);
1091        rpc_destroy_wait_queue(&xprt->pending);
1092        rpc_destroy_wait_queue(&xprt->sending);
1093        rpc_destroy_wait_queue(&xprt->resend);
1094        rpc_destroy_wait_queue(&xprt->backlog);
1095        /*
1096         * Tear down transport state and free the rpc_xprt
1097         */
1098        xprt->ops->destroy(xprt);
1099}
1100
1101/**
1102 * xprt_put - release a reference to an RPC transport.
1103 * @xprt: pointer to the transport
1104 *
1105 */
1106void xprt_put(struct rpc_xprt *xprt)
1107{
1108        kref_put(&xprt->kref, xprt_destroy);
1109}
1110
1111/**
1112 * xprt_get - return a reference to an RPC transport.
1113 * @xprt: pointer to the transport
1114 *
1115 */
1116struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1117{
1118        kref_get(&xprt->kref);
1119        return xprt;
1120}
1121