linux/net/sunrpc/xprt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/xprt.c
   3 *
   4 *  This is a generic RPC call interface supporting congestion avoidance,
   5 *  and asynchronous calls.
   6 *
   7 *  The interface works like this:
   8 *
   9 *  -   When a process places a call, it allocates a request slot if
  10 *      one is available. Otherwise, it sleeps on the backlog queue
  11 *      (xprt_reserve).
  12 *  -   Next, the caller puts together the RPC message, stuffs it into
  13 *      the request struct, and calls xprt_transmit().
  14 *  -   xprt_transmit sends the message and installs the caller on the
  15 *      transport's wait list. At the same time, if a reply is expected,
  16 *      it installs a timer that is run after the packet's timeout has
  17 *      expired.
  18 *  -   When a packet arrives, the data_ready handler walks the list of
  19 *      pending requests for that transport. If a matching XID is found, the
  20 *      caller is woken up, and the timer removed.
  21 *  -   When no reply arrives within the timeout interval, the timer is
  22 *      fired by the kernel and runs xprt_timer(). It either adjusts the
  23 *      timeout values (minor timeout) or wakes up the caller with a status
  24 *      of -ETIMEDOUT.
  25 *  -   When the caller receives a notification from RPC that a reply arrived,
  26 *      it should release the RPC slot, and process the reply.
  27 *      If the call timed out, it may choose to retry the operation by
  28 *      adjusting the initial timeout value, and simply calling rpc_call
  29 *      again.
  30 *
  31 *  Support for async RPC is done through a set of RPC-specific scheduling
  32 *  primitives that `transparently' work for processes as well as async
  33 *  tasks that rely on callbacks.
  34 *
  35 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  36 *
  37 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
  38 */
  39
  40#include <linux/module.h>
  41
  42#include <linux/types.h>
  43#include <linux/interrupt.h>
  44#include <linux/workqueue.h>
  45#include <linux/net.h>
  46#include <linux/ktime.h>
  47
  48#include <linux/sunrpc/clnt.h>
  49#include <linux/sunrpc/metrics.h>
  50#include <linux/sunrpc/bc_xprt.h>
  51
  52#include "sunrpc.h"
  53
  54/*
  55 * Local variables
  56 */
  57
  58#ifdef RPC_DEBUG
  59# define RPCDBG_FACILITY        RPCDBG_XPRT
  60#endif
  61
  62/*
  63 * Local functions
  64 */
  65static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
  66static void     xprt_connect_status(struct rpc_task *task);
  67static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
  68
  69static DEFINE_SPINLOCK(xprt_list_lock);
  70static LIST_HEAD(xprt_list);
  71
  72/*
  73 * The transport code maintains an estimate on the maximum number of out-
  74 * standing RPC requests, using a smoothed version of the congestion
  75 * avoidance implemented in 44BSD. This is basically the Van Jacobson
  76 * congestion algorithm: If a retransmit occurs, the congestion window is
  77 * halved; otherwise, it is incremented by 1/cwnd when
  78 *
  79 *      -       a reply is received and
  80 *      -       a full number of requests are outstanding and
  81 *      -       the congestion window hasn't been updated recently.
  82 */
  83#define RPC_CWNDSHIFT           (8U)
  84#define RPC_CWNDSCALE           (1U << RPC_CWNDSHIFT)
  85#define RPC_INITCWND            RPC_CWNDSCALE
  86#define RPC_MAXCWND(xprt)       ((xprt)->max_reqs << RPC_CWNDSHIFT)
  87
  88#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
  89
  90/**
  91 * xprt_register_transport - register a transport implementation
  92 * @transport: transport to register
  93 *
  94 * If a transport implementation is loaded as a kernel module, it can
  95 * call this interface to make itself known to the RPC client.
  96 *
  97 * Returns:
  98 * 0:           transport successfully registered
  99 * -EEXIST:     transport already registered
 100 * -EINVAL:     transport module being unloaded
 101 */
 102int xprt_register_transport(struct xprt_class *transport)
 103{
 104        struct xprt_class *t;
 105        int result;
 106
 107        result = -EEXIST;
 108        spin_lock(&xprt_list_lock);
 109        list_for_each_entry(t, &xprt_list, list) {
 110                /* don't register the same transport class twice */
 111                if (t->ident == transport->ident)
 112                        goto out;
 113        }
 114
 115        list_add_tail(&transport->list, &xprt_list);
 116        printk(KERN_INFO "RPC: Registered %s transport module.\n",
 117               transport->name);
 118        result = 0;
 119
 120out:
 121        spin_unlock(&xprt_list_lock);
 122        return result;
 123}
 124EXPORT_SYMBOL_GPL(xprt_register_transport);
 125
 126/**
 127 * xprt_unregister_transport - unregister a transport implementation
 128 * @transport: transport to unregister
 129 *
 130 * Returns:
 131 * 0:           transport successfully unregistered
 132 * -ENOENT:     transport never registered
 133 */
 134int xprt_unregister_transport(struct xprt_class *transport)
 135{
 136        struct xprt_class *t;
 137        int result;
 138
 139        result = 0;
 140        spin_lock(&xprt_list_lock);
 141        list_for_each_entry(t, &xprt_list, list) {
 142                if (t == transport) {
 143                        printk(KERN_INFO
 144                                "RPC: Unregistered %s transport module.\n",
 145                                transport->name);
 146                        list_del_init(&transport->list);
 147                        goto out;
 148                }
 149        }
 150        result = -ENOENT;
 151
 152out:
 153        spin_unlock(&xprt_list_lock);
 154        return result;
 155}
 156EXPORT_SYMBOL_GPL(xprt_unregister_transport);
 157
 158/**
 159 * xprt_load_transport - load a transport implementation
 160 * @transport_name: transport to load
 161 *
 162 * Returns:
 163 * 0:           transport successfully loaded
 164 * -ENOENT:     transport module not available
 165 */
 166int xprt_load_transport(const char *transport_name)
 167{
 168        struct xprt_class *t;
 169        int result;
 170
 171        result = 0;
 172        spin_lock(&xprt_list_lock);
 173        list_for_each_entry(t, &xprt_list, list) {
 174                if (strcmp(t->name, transport_name) == 0) {
 175                        spin_unlock(&xprt_list_lock);
 176                        goto out;
 177                }
 178        }
 179        spin_unlock(&xprt_list_lock);
 180        result = request_module("xprt%s", transport_name);
 181out:
 182        return result;
 183}
 184EXPORT_SYMBOL_GPL(xprt_load_transport);
 185
 186/**
 187 * xprt_reserve_xprt - serialize write access to transports
 188 * @task: task that is requesting access to the transport
 189 *
 190 * This prevents mixing the payload of separate requests, and prevents
 191 * transport connects from colliding with writes.  No congestion control
 192 * is provided.
 193 */
 194int xprt_reserve_xprt(struct rpc_task *task)
 195{
 196        struct rpc_rqst *req = task->tk_rqstp;
 197        struct rpc_xprt *xprt = req->rq_xprt;
 198
 199        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 200                if (task == xprt->snd_task)
 201                        return 1;
 202                if (task == NULL)
 203                        return 0;
 204                goto out_sleep;
 205        }
 206        xprt->snd_task = task;
 207        if (req) {
 208                req->rq_bytes_sent = 0;
 209                req->rq_ntrans++;
 210        }
 211        return 1;
 212
 213out_sleep:
 214        dprintk("RPC: %5u failed to lock transport %p\n",
 215                        task->tk_pid, xprt);
 216        task->tk_timeout = 0;
 217        task->tk_status = -EAGAIN;
 218        if (req && req->rq_ntrans)
 219                rpc_sleep_on(&xprt->resend, task, NULL);
 220        else
 221                rpc_sleep_on(&xprt->sending, task, NULL);
 222        return 0;
 223}
 224EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
 225
 226static void xprt_clear_locked(struct rpc_xprt *xprt)
 227{
 228        xprt->snd_task = NULL;
 229        if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
 230                smp_mb__before_clear_bit();
 231                clear_bit(XPRT_LOCKED, &xprt->state);
 232                smp_mb__after_clear_bit();
 233        } else
 234                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 235}
 236
 237/*
 238 * xprt_reserve_xprt_cong - serialize write access to transports
 239 * @task: task that is requesting access to the transport
 240 *
 241 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
 242 * integrated into the decision of whether a request is allowed to be
 243 * woken up and given access to the transport.
 244 */
 245int xprt_reserve_xprt_cong(struct rpc_task *task)
 246{
 247        struct rpc_xprt *xprt = task->tk_xprt;
 248        struct rpc_rqst *req = task->tk_rqstp;
 249
 250        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 251                if (task == xprt->snd_task)
 252                        return 1;
 253                goto out_sleep;
 254        }
 255        if (__xprt_get_cong(xprt, task)) {
 256                xprt->snd_task = task;
 257                if (req) {
 258                        req->rq_bytes_sent = 0;
 259                        req->rq_ntrans++;
 260                }
 261                return 1;
 262        }
 263        xprt_clear_locked(xprt);
 264out_sleep:
 265        dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
 266        task->tk_timeout = 0;
 267        task->tk_status = -EAGAIN;
 268        if (req && req->rq_ntrans)
 269                rpc_sleep_on(&xprt->resend, task, NULL);
 270        else
 271                rpc_sleep_on(&xprt->sending, task, NULL);
 272        return 0;
 273}
 274EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
 275
 276static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 277{
 278        int retval;
 279
 280        spin_lock_bh(&xprt->transport_lock);
 281        retval = xprt->ops->reserve_xprt(task);
 282        spin_unlock_bh(&xprt->transport_lock);
 283        return retval;
 284}
 285
 286static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 287{
 288        struct rpc_task *task;
 289        struct rpc_rqst *req;
 290
 291        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 292                return;
 293
 294        task = rpc_wake_up_next(&xprt->resend);
 295        if (!task) {
 296                task = rpc_wake_up_next(&xprt->sending);
 297                if (!task)
 298                        goto out_unlock;
 299        }
 300
 301        req = task->tk_rqstp;
 302        xprt->snd_task = task;
 303        if (req) {
 304                req->rq_bytes_sent = 0;
 305                req->rq_ntrans++;
 306        }
 307        return;
 308
 309out_unlock:
 310        xprt_clear_locked(xprt);
 311}
 312
 313static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 314{
 315        struct rpc_task *task;
 316
 317        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 318                return;
 319        if (RPCXPRT_CONGESTED(xprt))
 320                goto out_unlock;
 321        task = rpc_wake_up_next(&xprt->resend);
 322        if (!task) {
 323                task = rpc_wake_up_next(&xprt->sending);
 324                if (!task)
 325                        goto out_unlock;
 326        }
 327        if (__xprt_get_cong(xprt, task)) {
 328                struct rpc_rqst *req = task->tk_rqstp;
 329                xprt->snd_task = task;
 330                if (req) {
 331                        req->rq_bytes_sent = 0;
 332                        req->rq_ntrans++;
 333                }
 334                return;
 335        }
 336out_unlock:
 337        xprt_clear_locked(xprt);
 338}
 339
 340/**
 341 * xprt_release_xprt - allow other requests to use a transport
 342 * @xprt: transport with other tasks potentially waiting
 343 * @task: task that is releasing access to the transport
 344 *
 345 * Note that "task" can be NULL.  No congestion control is provided.
 346 */
 347void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 348{
 349        if (xprt->snd_task == task) {
 350                xprt_clear_locked(xprt);
 351                __xprt_lock_write_next(xprt);
 352        }
 353}
 354EXPORT_SYMBOL_GPL(xprt_release_xprt);
 355
 356/**
 357 * xprt_release_xprt_cong - allow other requests to use a transport
 358 * @xprt: transport with other tasks potentially waiting
 359 * @task: task that is releasing access to the transport
 360 *
 361 * Note that "task" can be NULL.  Another task is awoken to use the
 362 * transport if the transport's congestion window allows it.
 363 */
 364void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 365{
 366        if (xprt->snd_task == task) {
 367                xprt_clear_locked(xprt);
 368                __xprt_lock_write_next_cong(xprt);
 369        }
 370}
 371EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
 372
 373static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 374{
 375        spin_lock_bh(&xprt->transport_lock);
 376        xprt->ops->release_xprt(xprt, task);
 377        spin_unlock_bh(&xprt->transport_lock);
 378}
 379
 380/*
 381 * Van Jacobson congestion avoidance. Check if the congestion window
 382 * overflowed. Put the task to sleep if this is the case.
 383 */
 384static int
 385__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 386{
 387        struct rpc_rqst *req = task->tk_rqstp;
 388
 389        if (req->rq_cong)
 390                return 1;
 391        dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
 392                        task->tk_pid, xprt->cong, xprt->cwnd);
 393        if (RPCXPRT_CONGESTED(xprt))
 394                return 0;
 395        req->rq_cong = 1;
 396        xprt->cong += RPC_CWNDSCALE;
 397        return 1;
 398}
 399
 400/*
 401 * Adjust the congestion window, and wake up the next task
 402 * that has been sleeping due to congestion
 403 */
 404static void
 405__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 406{
 407        if (!req->rq_cong)
 408                return;
 409        req->rq_cong = 0;
 410        xprt->cong -= RPC_CWNDSCALE;
 411        __xprt_lock_write_next_cong(xprt);
 412}
 413
 414/**
 415 * xprt_release_rqst_cong - housekeeping when request is complete
 416 * @task: RPC request that recently completed
 417 *
 418 * Useful for transports that require congestion control.
 419 */
 420void xprt_release_rqst_cong(struct rpc_task *task)
 421{
 422        __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
 423}
 424EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
 425
 426/**
 427 * xprt_adjust_cwnd - adjust transport congestion window
 428 * @task: recently completed RPC request used to adjust window
 429 * @result: result code of completed RPC request
 430 *
 431 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
 432 */
 433void xprt_adjust_cwnd(struct rpc_task *task, int result)
 434{
 435        struct rpc_rqst *req = task->tk_rqstp;
 436        struct rpc_xprt *xprt = task->tk_xprt;
 437        unsigned long cwnd = xprt->cwnd;
 438
 439        if (result >= 0 && cwnd <= xprt->cong) {
 440                /* The (cwnd >> 1) term makes sure
 441                 * the result gets rounded properly. */
 442                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
 443                if (cwnd > RPC_MAXCWND(xprt))
 444                        cwnd = RPC_MAXCWND(xprt);
 445                __xprt_lock_write_next_cong(xprt);
 446        } else if (result == -ETIMEDOUT) {
 447                cwnd >>= 1;
 448                if (cwnd < RPC_CWNDSCALE)
 449                        cwnd = RPC_CWNDSCALE;
 450        }
 451        dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
 452                        xprt->cong, xprt->cwnd, cwnd);
 453        xprt->cwnd = cwnd;
 454        __xprt_put_cong(xprt, req);
 455}
 456EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
 457
 458/**
 459 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
 460 * @xprt: transport with waiting tasks
 461 * @status: result code to plant in each task before waking it
 462 *
 463 */
 464void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
 465{
 466        if (status < 0)
 467                rpc_wake_up_status(&xprt->pending, status);
 468        else
 469                rpc_wake_up(&xprt->pending);
 470}
 471EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
 472
 473/**
 474 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
 475 * @task: task to be put to sleep
 476 * @action: function pointer to be executed after wait
 477 */
 478void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
 479{
 480        struct rpc_rqst *req = task->tk_rqstp;
 481        struct rpc_xprt *xprt = req->rq_xprt;
 482
 483        task->tk_timeout = req->rq_timeout;
 484        rpc_sleep_on(&xprt->pending, task, action);
 485}
 486EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 487
 488/**
 489 * xprt_write_space - wake the task waiting for transport output buffer space
 490 * @xprt: transport with waiting tasks
 491 *
 492 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
 493 */
 494void xprt_write_space(struct rpc_xprt *xprt)
 495{
 496        if (unlikely(xprt->shutdown))
 497                return;
 498
 499        spin_lock_bh(&xprt->transport_lock);
 500        if (xprt->snd_task) {
 501                dprintk("RPC:       write space: waking waiting task on "
 502                                "xprt %p\n", xprt);
 503                rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
 504        }
 505        spin_unlock_bh(&xprt->transport_lock);
 506}
 507EXPORT_SYMBOL_GPL(xprt_write_space);
 508
 509/**
 510 * xprt_set_retrans_timeout_def - set a request's retransmit timeout
 511 * @task: task whose timeout is to be set
 512 *
 513 * Set a request's retransmit timeout based on the transport's
 514 * default timeout parameters.  Used by transports that don't adjust
 515 * the retransmit timeout based on round-trip time estimation.
 516 */
 517void xprt_set_retrans_timeout_def(struct rpc_task *task)
 518{
 519        task->tk_timeout = task->tk_rqstp->rq_timeout;
 520}
 521EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
 522
 523/*
 524 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
 525 * @task: task whose timeout is to be set
 526 *
 527 * Set a request's retransmit timeout using the RTT estimator.
 528 */
 529void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
 530{
 531        int timer = task->tk_msg.rpc_proc->p_timer;
 532        struct rpc_clnt *clnt = task->tk_client;
 533        struct rpc_rtt *rtt = clnt->cl_rtt;
 534        struct rpc_rqst *req = task->tk_rqstp;
 535        unsigned long max_timeout = clnt->cl_timeout->to_maxval;
 536
 537        task->tk_timeout = rpc_calc_rto(rtt, timer);
 538        task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
 539        if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
 540                task->tk_timeout = max_timeout;
 541}
 542EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
 543
 544static void xprt_reset_majortimeo(struct rpc_rqst *req)
 545{
 546        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 547
 548        req->rq_majortimeo = req->rq_timeout;
 549        if (to->to_exponential)
 550                req->rq_majortimeo <<= to->to_retries;
 551        else
 552                req->rq_majortimeo += to->to_increment * to->to_retries;
 553        if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
 554                req->rq_majortimeo = to->to_maxval;
 555        req->rq_majortimeo += jiffies;
 556}
 557
 558/**
 559 * xprt_adjust_timeout - adjust timeout values for next retransmit
 560 * @req: RPC request containing parameters to use for the adjustment
 561 *
 562 */
 563int xprt_adjust_timeout(struct rpc_rqst *req)
 564{
 565        struct rpc_xprt *xprt = req->rq_xprt;
 566        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 567        int status = 0;
 568
 569        if (time_before(jiffies, req->rq_majortimeo)) {
 570                if (to->to_exponential)
 571                        req->rq_timeout <<= 1;
 572                else
 573                        req->rq_timeout += to->to_increment;
 574                if (to->to_maxval && req->rq_timeout >= to->to_maxval)
 575                        req->rq_timeout = to->to_maxval;
 576                req->rq_retries++;
 577        } else {
 578                req->rq_timeout = to->to_initval;
 579                req->rq_retries = 0;
 580                xprt_reset_majortimeo(req);
 581                /* Reset the RTT counters == "slow start" */
 582                spin_lock_bh(&xprt->transport_lock);
 583                rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
 584                spin_unlock_bh(&xprt->transport_lock);
 585                status = -ETIMEDOUT;
 586        }
 587
 588        if (req->rq_timeout == 0) {
 589                printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
 590                req->rq_timeout = 5 * HZ;
 591        }
 592        return status;
 593}
 594
 595static void xprt_autoclose(struct work_struct *work)
 596{
 597        struct rpc_xprt *xprt =
 598                container_of(work, struct rpc_xprt, task_cleanup);
 599
 600        xprt->ops->close(xprt);
 601        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 602        xprt_release_write(xprt, NULL);
 603}
 604
 605/**
 606 * xprt_disconnect_done - mark a transport as disconnected
 607 * @xprt: transport to flag for disconnect
 608 *
 609 */
 610void xprt_disconnect_done(struct rpc_xprt *xprt)
 611{
 612        dprintk("RPC:       disconnected transport %p\n", xprt);
 613        spin_lock_bh(&xprt->transport_lock);
 614        xprt_clear_connected(xprt);
 615        xprt_wake_pending_tasks(xprt, -EAGAIN);
 616        spin_unlock_bh(&xprt->transport_lock);
 617}
 618EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 619
 620/**
 621 * xprt_force_disconnect - force a transport to disconnect
 622 * @xprt: transport to disconnect
 623 *
 624 */
 625void xprt_force_disconnect(struct rpc_xprt *xprt)
 626{
 627        /* Don't race with the test_bit() in xprt_clear_locked() */
 628        spin_lock_bh(&xprt->transport_lock);
 629        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 630        /* Try to schedule an autoclose RPC call */
 631        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 632                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 633        xprt_wake_pending_tasks(xprt, -EAGAIN);
 634        spin_unlock_bh(&xprt->transport_lock);
 635}
 636
 637/**
 638 * xprt_conditional_disconnect - force a transport to disconnect
 639 * @xprt: transport to disconnect
 640 * @cookie: 'connection cookie'
 641 *
 642 * This attempts to break the connection if and only if 'cookie' matches
 643 * the current transport 'connection cookie'. It ensures that we don't
 644 * try to break the connection more than once when we need to retransmit
 645 * a batch of RPC requests.
 646 *
 647 */
 648void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 649{
 650        /* Don't race with the test_bit() in xprt_clear_locked() */
 651        spin_lock_bh(&xprt->transport_lock);
 652        if (cookie != xprt->connect_cookie)
 653                goto out;
 654        if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
 655                goto out;
 656        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 657        /* Try to schedule an autoclose RPC call */
 658        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 659                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 660        xprt_wake_pending_tasks(xprt, -EAGAIN);
 661out:
 662        spin_unlock_bh(&xprt->transport_lock);
 663}
 664
 665static void
 666xprt_init_autodisconnect(unsigned long data)
 667{
 668        struct rpc_xprt *xprt = (struct rpc_xprt *)data;
 669
 670        spin_lock(&xprt->transport_lock);
 671        if (!list_empty(&xprt->recv) || xprt->shutdown)
 672                goto out_abort;
 673        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 674                goto out_abort;
 675        spin_unlock(&xprt->transport_lock);
 676        set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
 677        queue_work(rpciod_workqueue, &xprt->task_cleanup);
 678        return;
 679out_abort:
 680        spin_unlock(&xprt->transport_lock);
 681}
 682
 683/**
 684 * xprt_connect - schedule a transport connect operation
 685 * @task: RPC task that is requesting the connect
 686 *
 687 */
 688void xprt_connect(struct rpc_task *task)
 689{
 690        struct rpc_xprt *xprt = task->tk_xprt;
 691
 692        dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
 693                        xprt, (xprt_connected(xprt) ? "is" : "is not"));
 694
 695        if (!xprt_bound(xprt)) {
 696                task->tk_status = -EAGAIN;
 697                return;
 698        }
 699        if (!xprt_lock_write(xprt, task))
 700                return;
 701
 702        if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state))
 703                xprt->ops->close(xprt);
 704
 705        if (xprt_connected(xprt))
 706                xprt_release_write(xprt, task);
 707        else {
 708                if (task->tk_rqstp)
 709                        task->tk_rqstp->rq_bytes_sent = 0;
 710
 711                task->tk_timeout = task->tk_rqstp->rq_timeout;
 712                rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
 713
 714                if (test_bit(XPRT_CLOSING, &xprt->state))
 715                        return;
 716                if (xprt_test_and_set_connecting(xprt))
 717                        return;
 718                xprt->stat.connect_start = jiffies;
 719                xprt->ops->connect(task);
 720        }
 721}
 722
 723static void xprt_connect_status(struct rpc_task *task)
 724{
 725        struct rpc_xprt *xprt = task->tk_xprt;
 726
 727        if (task->tk_status == 0) {
 728                xprt->stat.connect_count++;
 729                xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
 730                dprintk("RPC: %5u xprt_connect_status: connection established\n",
 731                                task->tk_pid);
 732                return;
 733        }
 734
 735        switch (task->tk_status) {
 736        case -EAGAIN:
 737                dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
 738                break;
 739        case -ETIMEDOUT:
 740                dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
 741                                "out\n", task->tk_pid);
 742                break;
 743        default:
 744                dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
 745                                "server %s\n", task->tk_pid, -task->tk_status,
 746                                task->tk_client->cl_server);
 747                xprt_release_write(xprt, task);
 748                task->tk_status = -EIO;
 749        }
 750}
 751
 752/**
 753 * xprt_lookup_rqst - find an RPC request corresponding to an XID
 754 * @xprt: transport on which the original request was transmitted
 755 * @xid: RPC XID of incoming reply
 756 *
 757 */
 758struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 759{
 760        struct list_head *pos;
 761
 762        list_for_each(pos, &xprt->recv) {
 763                struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
 764                if (entry->rq_xid == xid)
 765                        return entry;
 766        }
 767
 768        dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
 769                        ntohl(xid));
 770        xprt->stat.bad_xids++;
 771        return NULL;
 772}
 773EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 774
 775static void xprt_update_rtt(struct rpc_task *task)
 776{
 777        struct rpc_rqst *req = task->tk_rqstp;
 778        struct rpc_rtt *rtt = task->tk_client->cl_rtt;
 779        unsigned timer = task->tk_msg.rpc_proc->p_timer;
 780        long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
 781
 782        if (timer) {
 783                if (req->rq_ntrans == 1)
 784                        rpc_update_rtt(rtt, timer, m);
 785                rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
 786        }
 787}
 788
 789/**
 790 * xprt_complete_rqst - called when reply processing is complete
 791 * @task: RPC request that recently completed
 792 * @copied: actual number of bytes received from the transport
 793 *
 794 * Caller holds transport lock.
 795 */
 796void xprt_complete_rqst(struct rpc_task *task, int copied)
 797{
 798        struct rpc_rqst *req = task->tk_rqstp;
 799        struct rpc_xprt *xprt = req->rq_xprt;
 800
 801        dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
 802                        task->tk_pid, ntohl(req->rq_xid), copied);
 803
 804        xprt->stat.recvs++;
 805        req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime);
 806        if (xprt->ops->timer != NULL)
 807                xprt_update_rtt(task);
 808
 809        list_del_init(&req->rq_list);
 810        req->rq_private_buf.len = copied;
 811        /* Ensure all writes are done before we update */
 812        /* req->rq_reply_bytes_recvd */
 813        smp_wmb();
 814        req->rq_reply_bytes_recvd = copied;
 815        rpc_wake_up_queued_task(&xprt->pending, task);
 816}
 817EXPORT_SYMBOL_GPL(xprt_complete_rqst);
 818
 819static void xprt_timer(struct rpc_task *task)
 820{
 821        struct rpc_rqst *req = task->tk_rqstp;
 822        struct rpc_xprt *xprt = req->rq_xprt;
 823
 824        if (task->tk_status != -ETIMEDOUT)
 825                return;
 826        dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
 827
 828        spin_lock_bh(&xprt->transport_lock);
 829        if (!req->rq_reply_bytes_recvd) {
 830                if (xprt->ops->timer)
 831                        xprt->ops->timer(task);
 832        } else
 833                task->tk_status = 0;
 834        spin_unlock_bh(&xprt->transport_lock);
 835}
 836
 837static inline int xprt_has_timer(struct rpc_xprt *xprt)
 838{
 839        return xprt->idle_timeout != 0;
 840}
 841
 842/**
 843 * xprt_prepare_transmit - reserve the transport before sending a request
 844 * @task: RPC task about to send a request
 845 *
 846 */
 847int xprt_prepare_transmit(struct rpc_task *task)
 848{
 849        struct rpc_rqst *req = task->tk_rqstp;
 850        struct rpc_xprt *xprt = req->rq_xprt;
 851        int err = 0;
 852
 853        dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
 854
 855        spin_lock_bh(&xprt->transport_lock);
 856        if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) {
 857                err = req->rq_reply_bytes_recvd;
 858                goto out_unlock;
 859        }
 860        if (!xprt->ops->reserve_xprt(task))
 861                err = -EAGAIN;
 862out_unlock:
 863        spin_unlock_bh(&xprt->transport_lock);
 864        return err;
 865}
 866
 867void xprt_end_transmit(struct rpc_task *task)
 868{
 869        xprt_release_write(task->tk_rqstp->rq_xprt, task);
 870}
 871
 872/**
 873 * xprt_transmit - send an RPC request on a transport
 874 * @task: controlling RPC task
 875 *
 876 * We have to copy the iovec because sendmsg fiddles with its contents.
 877 */
 878void xprt_transmit(struct rpc_task *task)
 879{
 880        struct rpc_rqst *req = task->tk_rqstp;
 881        struct rpc_xprt *xprt = req->rq_xprt;
 882        int status;
 883
 884        dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 885
 886        if (!req->rq_reply_bytes_recvd) {
 887                if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
 888                        /*
 889                         * Add to the list only if we're expecting a reply
 890                         */
 891                        spin_lock_bh(&xprt->transport_lock);
 892                        /* Update the softirq receive buffer */
 893                        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 894                                        sizeof(req->rq_private_buf));
 895                        /* Add request to the receive list */
 896                        list_add_tail(&req->rq_list, &xprt->recv);
 897                        spin_unlock_bh(&xprt->transport_lock);
 898                        xprt_reset_majortimeo(req);
 899                        /* Turn off autodisconnect */
 900                        del_singleshot_timer_sync(&xprt->timer);
 901                }
 902        } else if (!req->rq_bytes_sent)
 903                return;
 904
 905        req->rq_connect_cookie = xprt->connect_cookie;
 906        req->rq_xtime = ktime_get();
 907        status = xprt->ops->send_request(task);
 908        if (status != 0) {
 909                task->tk_status = status;
 910                return;
 911        }
 912
 913        dprintk("RPC: %5u xmit complete\n", task->tk_pid);
 914        spin_lock_bh(&xprt->transport_lock);
 915
 916        xprt->ops->set_retrans_timeout(task);
 917
 918        xprt->stat.sends++;
 919        xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
 920        xprt->stat.bklog_u += xprt->backlog.qlen;
 921
 922        /* Don't race with disconnect */
 923        if (!xprt_connected(xprt))
 924                task->tk_status = -ENOTCONN;
 925        else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) {
 926                /*
 927                 * Sleep on the pending queue since
 928                 * we're expecting a reply.
 929                 */
 930                rpc_sleep_on(&xprt->pending, task, xprt_timer);
 931        }
 932        spin_unlock_bh(&xprt->transport_lock);
 933}
 934
 935static void xprt_alloc_slot(struct rpc_task *task)
 936{
 937        struct rpc_xprt *xprt = task->tk_xprt;
 938
 939        task->tk_status = 0;
 940        if (task->tk_rqstp)
 941                return;
 942        if (!list_empty(&xprt->free)) {
 943                struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
 944                list_del_init(&req->rq_list);
 945                task->tk_rqstp = req;
 946                xprt_request_init(task, xprt);
 947                return;
 948        }
 949        dprintk("RPC:       waiting for request slot\n");
 950        task->tk_status = -EAGAIN;
 951        task->tk_timeout = 0;
 952        rpc_sleep_on(&xprt->backlog, task, NULL);
 953}
 954
 955static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 956{
 957        memset(req, 0, sizeof(*req));   /* mark unused */
 958
 959        spin_lock(&xprt->reserve_lock);
 960        list_add(&req->rq_list, &xprt->free);
 961        rpc_wake_up_next(&xprt->backlog);
 962        spin_unlock(&xprt->reserve_lock);
 963}
 964
 965/**
 966 * xprt_reserve - allocate an RPC request slot
 967 * @task: RPC task requesting a slot allocation
 968 *
 969 * If no more slots are available, place the task on the transport's
 970 * backlog queue.
 971 */
 972void xprt_reserve(struct rpc_task *task)
 973{
 974        struct rpc_xprt *xprt = task->tk_xprt;
 975
 976        task->tk_status = -EIO;
 977        spin_lock(&xprt->reserve_lock);
 978        xprt_alloc_slot(task);
 979        spin_unlock(&xprt->reserve_lock);
 980}
 981
 982static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
 983{
 984        return (__force __be32)xprt->xid++;
 985}
 986
 987static inline void xprt_init_xid(struct rpc_xprt *xprt)
 988{
 989        xprt->xid = net_random();
 990}
 991
 992static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 993{
 994        struct rpc_rqst *req = task->tk_rqstp;
 995
 996        req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 997        req->rq_task    = task;
 998        req->rq_xprt    = xprt;
 999        req->rq_buffer  = NULL;
1000        req->rq_xid     = xprt_alloc_xid(xprt);
1001        req->rq_release_snd_buf = NULL;
1002        xprt_reset_majortimeo(req);
1003        dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
1004                        req, ntohl(req->rq_xid));
1005}
1006
1007/**
1008 * xprt_release - release an RPC request slot
1009 * @task: task which is finished with the slot
1010 *
1011 */
1012void xprt_release(struct rpc_task *task)
1013{
1014        struct rpc_xprt *xprt;
1015        struct rpc_rqst *req;
1016
1017        if (!(req = task->tk_rqstp))
1018                return;
1019
1020        xprt = req->rq_xprt;
1021        rpc_count_iostats(task);
1022        spin_lock_bh(&xprt->transport_lock);
1023        xprt->ops->release_xprt(xprt, task);
1024        if (xprt->ops->release_request)
1025                xprt->ops->release_request(task);
1026        if (!list_empty(&req->rq_list))
1027                list_del(&req->rq_list);
1028        xprt->last_used = jiffies;
1029        if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
1030                mod_timer(&xprt->timer,
1031                                xprt->last_used + xprt->idle_timeout);
1032        spin_unlock_bh(&xprt->transport_lock);
1033        if (req->rq_buffer)
1034                xprt->ops->buf_free(req->rq_buffer);
1035        task->tk_rqstp = NULL;
1036        if (req->rq_release_snd_buf)
1037                req->rq_release_snd_buf(req);
1038
1039        dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
1040        if (likely(!bc_prealloc(req)))
1041                xprt_free_slot(xprt, req);
1042        else
1043                xprt_free_bc_request(req);
1044}
1045
1046/**
1047 * xprt_create_transport - create an RPC transport
1048 * @args: rpc transport creation arguments
1049 *
1050 */
1051struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1052{
1053        struct rpc_xprt *xprt;
1054        struct rpc_rqst *req;
1055        struct xprt_class *t;
1056
1057        spin_lock(&xprt_list_lock);
1058        list_for_each_entry(t, &xprt_list, list) {
1059                if (t->ident == args->ident) {
1060                        spin_unlock(&xprt_list_lock);
1061                        goto found;
1062                }
1063        }
1064        spin_unlock(&xprt_list_lock);
1065        printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
1066        return ERR_PTR(-EIO);
1067
1068found:
1069        xprt = t->setup(args);
1070        if (IS_ERR(xprt)) {
1071                dprintk("RPC:       xprt_create_transport: failed, %ld\n",
1072                                -PTR_ERR(xprt));
1073                return xprt;
1074        }
1075
1076        kref_init(&xprt->kref);
1077        spin_lock_init(&xprt->transport_lock);
1078        spin_lock_init(&xprt->reserve_lock);
1079
1080        INIT_LIST_HEAD(&xprt->free);
1081        INIT_LIST_HEAD(&xprt->recv);
1082#if defined(CONFIG_NFS_V4_1)
1083        spin_lock_init(&xprt->bc_pa_lock);
1084        INIT_LIST_HEAD(&xprt->bc_pa_list);
1085#endif /* CONFIG_NFS_V4_1 */
1086
1087        INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1088        if (xprt_has_timer(xprt))
1089                setup_timer(&xprt->timer, xprt_init_autodisconnect,
1090                            (unsigned long)xprt);
1091        else
1092                init_timer(&xprt->timer);
1093        xprt->last_used = jiffies;
1094        xprt->cwnd = RPC_INITCWND;
1095        xprt->bind_index = 0;
1096
1097        rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1098        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1099        rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1100        rpc_init_wait_queue(&xprt->resend, "xprt_resend");
1101        rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1102
1103        /* initialize free list */
1104        for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
1105                list_add(&req->rq_list, &xprt->free);
1106
1107        xprt_init_xid(xprt);
1108
1109        dprintk("RPC:       created transport %p with %u slots\n", xprt,
1110                        xprt->max_reqs);
1111        return xprt;
1112}
1113
1114/**
1115 * xprt_destroy - destroy an RPC transport, killing off all requests.
1116 * @kref: kref for the transport to destroy
1117 *
1118 */
1119static void xprt_destroy(struct kref *kref)
1120{
1121        struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
1122
1123        dprintk("RPC:       destroying transport %p\n", xprt);
1124        xprt->shutdown = 1;
1125        del_timer_sync(&xprt->timer);
1126
1127        rpc_destroy_wait_queue(&xprt->binding);
1128        rpc_destroy_wait_queue(&xprt->pending);
1129        rpc_destroy_wait_queue(&xprt->sending);
1130        rpc_destroy_wait_queue(&xprt->resend);
1131        rpc_destroy_wait_queue(&xprt->backlog);
1132        /*
1133         * Tear down transport state and free the rpc_xprt
1134         */
1135        xprt->ops->destroy(xprt);
1136}
1137
1138/**
1139 * xprt_put - release a reference to an RPC transport.
1140 * @xprt: pointer to the transport
1141 *
1142 */
1143void xprt_put(struct rpc_xprt *xprt)
1144{
1145        kref_put(&xprt->kref, xprt_destroy);
1146}
1147
1148/**
1149 * xprt_get - return a reference to an RPC transport.
1150 * @xprt: pointer to the transport
1151 *
1152 */
1153struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1154{
1155        kref_get(&xprt->kref);
1156        return xprt;
1157}
1158