linux/net/sunrpc/clnt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/clnt.c
   3 *
   4 *  This file contains the high-level RPC interface.
   5 *  It is modeled as a finite state machine to support both synchronous
   6 *  and asynchronous requests.
   7 *
   8 *  -   RPC header generation and argument serialization.
   9 *  -   Credential refresh.
  10 *  -   TCP connect handling.
  11 *  -   Retry of operation when it is suspected the operation failed because
  12 *      of uid squashing on the server, or when the credentials were stale
  13 *      and need to be refreshed, or when a packet was damaged in transit.
  14 *      This may be have to be moved to the VFS layer.
  15 *
  16 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
  17 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
  18 */
  19
  20
  21#include <linux/module.h>
  22#include <linux/types.h>
  23#include <linux/kallsyms.h>
  24#include <linux/mm.h>
  25#include <linux/namei.h>
  26#include <linux/mount.h>
  27#include <linux/slab.h>
  28#include <linux/utsname.h>
  29#include <linux/workqueue.h>
  30#include <linux/in.h>
  31#include <linux/in6.h>
  32#include <linux/un.h>
  33#include <linux/rcupdate.h>
  34
  35#include <linux/sunrpc/clnt.h>
  36#include <linux/sunrpc/rpc_pipe_fs.h>
  37#include <linux/sunrpc/metrics.h>
  38#include <linux/sunrpc/bc_xprt.h>
  39#include <trace/events/sunrpc.h>
  40
  41#include "sunrpc.h"
  42#include "netns.h"
  43
  44#ifdef RPC_DEBUG
  45# define RPCDBG_FACILITY        RPCDBG_CALL
  46#endif
  47
  48#define dprint_status(t)                                        \
  49        dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
  50                        __func__, t->tk_status)
  51
  52/*
  53 * All RPC clients are linked into this list
  54 */
  55
  56static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
  57
  58
  59static void     call_start(struct rpc_task *task);
  60static void     call_reserve(struct rpc_task *task);
  61static void     call_reserveresult(struct rpc_task *task);
  62static void     call_allocate(struct rpc_task *task);
  63static void     call_decode(struct rpc_task *task);
  64static void     call_bind(struct rpc_task *task);
  65static void     call_bind_status(struct rpc_task *task);
  66static void     call_transmit(struct rpc_task *task);
  67#if defined(CONFIG_SUNRPC_BACKCHANNEL)
  68static void     call_bc_transmit(struct rpc_task *task);
  69#endif /* CONFIG_SUNRPC_BACKCHANNEL */
  70static void     call_status(struct rpc_task *task);
  71static void     call_transmit_status(struct rpc_task *task);
  72static void     call_refresh(struct rpc_task *task);
  73static void     call_refreshresult(struct rpc_task *task);
  74static void     call_timeout(struct rpc_task *task);
  75static void     call_connect(struct rpc_task *task);
  76static void     call_connect_status(struct rpc_task *task);
  77
  78static __be32   *rpc_encode_header(struct rpc_task *task);
  79static __be32   *rpc_verify_header(struct rpc_task *task);
  80static int      rpc_ping(struct rpc_clnt *clnt);
  81
  82static void rpc_register_client(struct rpc_clnt *clnt)
  83{
  84        struct net *net = rpc_net_ns(clnt);
  85        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  86
  87        spin_lock(&sn->rpc_client_lock);
  88        list_add(&clnt->cl_clients, &sn->all_clients);
  89        spin_unlock(&sn->rpc_client_lock);
  90}
  91
  92static void rpc_unregister_client(struct rpc_clnt *clnt)
  93{
  94        struct net *net = rpc_net_ns(clnt);
  95        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  96
  97        spin_lock(&sn->rpc_client_lock);
  98        list_del(&clnt->cl_clients);
  99        spin_unlock(&sn->rpc_client_lock);
 100}
 101
 102static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 103{
 104        if (clnt->cl_dentry) {
 105                if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
 106                        clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
 107                rpc_remove_client_dir(clnt->cl_dentry);
 108        }
 109        clnt->cl_dentry = NULL;
 110}
 111
 112static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 113{
 114        struct net *net = rpc_net_ns(clnt);
 115        struct super_block *pipefs_sb;
 116
 117        pipefs_sb = rpc_get_sb_net(net);
 118        if (pipefs_sb) {
 119                __rpc_clnt_remove_pipedir(clnt);
 120                rpc_put_sb_net(net);
 121        }
 122}
 123
 124static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
 125                                    struct rpc_clnt *clnt,
 126                                    const char *dir_name)
 127{
 128        static uint32_t clntid;
 129        char name[15];
 130        struct qstr q = { .name = name };
 131        struct dentry *dir, *dentry;
 132        int error;
 133
 134        dir = rpc_d_lookup_sb(sb, dir_name);
 135        if (dir == NULL) {
 136                pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
 137                return dir;
 138        }
 139        for (;;) {
 140                q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
 141                name[sizeof(name) - 1] = '\0';
 142                q.hash = full_name_hash(q.name, q.len);
 143                dentry = rpc_create_client_dir(dir, &q, clnt);
 144                if (!IS_ERR(dentry))
 145                        break;
 146                error = PTR_ERR(dentry);
 147                if (error != -EEXIST) {
 148                        printk(KERN_INFO "RPC: Couldn't create pipefs entry"
 149                                        " %s/%s, error %d\n",
 150                                        dir_name, name, error);
 151                        break;
 152                }
 153        }
 154        dput(dir);
 155        return dentry;
 156}
 157
 158static int
 159rpc_setup_pipedir(struct rpc_clnt *clnt, const char *dir_name)
 160{
 161        struct net *net = rpc_net_ns(clnt);
 162        struct super_block *pipefs_sb;
 163        struct dentry *dentry;
 164
 165        clnt->cl_dentry = NULL;
 166        if (dir_name == NULL)
 167                return 0;
 168        pipefs_sb = rpc_get_sb_net(net);
 169        if (!pipefs_sb)
 170                return 0;
 171        dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt, dir_name);
 172        rpc_put_sb_net(net);
 173        if (IS_ERR(dentry))
 174                return PTR_ERR(dentry);
 175        clnt->cl_dentry = dentry;
 176        return 0;
 177}
 178
 179static inline int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
 180{
 181        if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
 182            ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
 183                return 1;
 184        return 0;
 185}
 186
 187static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
 188                                   struct super_block *sb)
 189{
 190        struct dentry *dentry;
 191        int err = 0;
 192
 193        switch (event) {
 194        case RPC_PIPEFS_MOUNT:
 195                dentry = rpc_setup_pipedir_sb(sb, clnt,
 196                                              clnt->cl_program->pipe_dir_name);
 197                if (!dentry)
 198                        return -ENOENT;
 199                if (IS_ERR(dentry))
 200                        return PTR_ERR(dentry);
 201                clnt->cl_dentry = dentry;
 202                if (clnt->cl_auth->au_ops->pipes_create) {
 203                        err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
 204                        if (err)
 205                                __rpc_clnt_remove_pipedir(clnt);
 206                }
 207                break;
 208        case RPC_PIPEFS_UMOUNT:
 209                __rpc_clnt_remove_pipedir(clnt);
 210                break;
 211        default:
 212                printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
 213                return -ENOTSUPP;
 214        }
 215        return err;
 216}
 217
 218static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
 219                                struct super_block *sb)
 220{
 221        int error = 0;
 222
 223        for (;; clnt = clnt->cl_parent) {
 224                if (!rpc_clnt_skip_event(clnt, event))
 225                        error = __rpc_clnt_handle_event(clnt, event, sb);
 226                if (error || clnt == clnt->cl_parent)
 227                        break;
 228        }
 229        return error;
 230}
 231
 232static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
 233{
 234        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
 235        struct rpc_clnt *clnt;
 236
 237        spin_lock(&sn->rpc_client_lock);
 238        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
 239                if (clnt->cl_program->pipe_dir_name == NULL)
 240                        continue;
 241                if (rpc_clnt_skip_event(clnt, event))
 242                        continue;
 243                if (atomic_inc_not_zero(&clnt->cl_count) == 0)
 244                        continue;
 245                spin_unlock(&sn->rpc_client_lock);
 246                return clnt;
 247        }
 248        spin_unlock(&sn->rpc_client_lock);
 249        return NULL;
 250}
 251
 252static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
 253                            void *ptr)
 254{
 255        struct super_block *sb = ptr;
 256        struct rpc_clnt *clnt;
 257        int error = 0;
 258
 259        while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
 260                error = __rpc_pipefs_event(clnt, event, sb);
 261                rpc_release_client(clnt);
 262                if (error)
 263                        break;
 264        }
 265        return error;
 266}
 267
 268static struct notifier_block rpc_clients_block = {
 269        .notifier_call  = rpc_pipefs_event,
 270        .priority       = SUNRPC_PIPEFS_RPC_PRIO,
 271};
 272
 273int rpc_clients_notifier_register(void)
 274{
 275        return rpc_pipefs_notifier_register(&rpc_clients_block);
 276}
 277
 278void rpc_clients_notifier_unregister(void)
 279{
 280        return rpc_pipefs_notifier_unregister(&rpc_clients_block);
 281}
 282
 283static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
 284{
 285        clnt->cl_nodelen = strlen(nodename);
 286        if (clnt->cl_nodelen > UNX_MAXNODENAME)
 287                clnt->cl_nodelen = UNX_MAXNODENAME;
 288        memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
 289}
 290
 291static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
 292{
 293        const struct rpc_program *program = args->program;
 294        const struct rpc_version *version;
 295        struct rpc_clnt         *clnt = NULL;
 296        struct rpc_auth         *auth;
 297        int err;
 298
 299        /* sanity check the name before trying to print it */
 300        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
 301                        program->name, args->servername, xprt);
 302
 303        err = rpciod_up();
 304        if (err)
 305                goto out_no_rpciod;
 306        err = -EINVAL;
 307        if (!xprt)
 308                goto out_no_xprt;
 309
 310        if (args->version >= program->nrvers)
 311                goto out_err;
 312        version = program->version[args->version];
 313        if (version == NULL)
 314                goto out_err;
 315
 316        err = -ENOMEM;
 317        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
 318        if (!clnt)
 319                goto out_err;
 320        clnt->cl_parent = clnt;
 321
 322        rcu_assign_pointer(clnt->cl_xprt, xprt);
 323        clnt->cl_procinfo = version->procs;
 324        clnt->cl_maxproc  = version->nrprocs;
 325        clnt->cl_protname = program->name;
 326        clnt->cl_prog     = args->prognumber ? : program->number;
 327        clnt->cl_vers     = version->number;
 328        clnt->cl_stats    = program->stats;
 329        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
 330        err = -ENOMEM;
 331        if (clnt->cl_metrics == NULL)
 332                goto out_no_stats;
 333        clnt->cl_program  = program;
 334        INIT_LIST_HEAD(&clnt->cl_tasks);
 335        spin_lock_init(&clnt->cl_lock);
 336
 337        if (!xprt_bound(xprt))
 338                clnt->cl_autobind = 1;
 339
 340        clnt->cl_timeout = xprt->timeout;
 341        if (args->timeout != NULL) {
 342                memcpy(&clnt->cl_timeout_default, args->timeout,
 343                                sizeof(clnt->cl_timeout_default));
 344                clnt->cl_timeout = &clnt->cl_timeout_default;
 345        }
 346
 347        clnt->cl_rtt = &clnt->cl_rtt_default;
 348        rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
 349        clnt->cl_principal = NULL;
 350        if (args->client_name) {
 351                clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
 352                if (!clnt->cl_principal)
 353                        goto out_no_principal;
 354        }
 355
 356        atomic_set(&clnt->cl_count, 1);
 357
 358        err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
 359        if (err < 0)
 360                goto out_no_path;
 361
 362        auth = rpcauth_create(args->authflavor, clnt);
 363        if (IS_ERR(auth)) {
 364                printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
 365                                args->authflavor);
 366                err = PTR_ERR(auth);
 367                goto out_no_auth;
 368        }
 369
 370        /* save the nodename */
 371        rpc_clnt_set_nodename(clnt, utsname()->nodename);
 372        rpc_register_client(clnt);
 373        return clnt;
 374
 375out_no_auth:
 376        rpc_clnt_remove_pipedir(clnt);
 377out_no_path:
 378        kfree(clnt->cl_principal);
 379out_no_principal:
 380        rpc_free_iostats(clnt->cl_metrics);
 381out_no_stats:
 382        kfree(clnt);
 383out_err:
 384        xprt_put(xprt);
 385out_no_xprt:
 386        rpciod_down();
 387out_no_rpciod:
 388        return ERR_PTR(err);
 389}
 390
 391/**
 392 * rpc_create - create an RPC client and transport with one call
 393 * @args: rpc_clnt create argument structure
 394 *
 395 * Creates and initializes an RPC transport and an RPC client.
 396 *
 397 * It can ping the server in order to determine if it is up, and to see if
 398 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
 399 * this behavior so asynchronous tasks can also use rpc_create.
 400 */
 401struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 402{
 403        struct rpc_xprt *xprt;
 404        struct rpc_clnt *clnt;
 405        struct xprt_create xprtargs = {
 406                .net = args->net,
 407                .ident = args->protocol,
 408                .srcaddr = args->saddress,
 409                .dstaddr = args->address,
 410                .addrlen = args->addrsize,
 411                .servername = args->servername,
 412                .bc_xprt = args->bc_xprt,
 413        };
 414        char servername[48];
 415
 416        /*
 417         * If the caller chooses not to specify a hostname, whip
 418         * up a string representation of the passed-in address.
 419         */
 420        if (xprtargs.servername == NULL) {
 421                struct sockaddr_un *sun =
 422                                (struct sockaddr_un *)args->address;
 423                struct sockaddr_in *sin =
 424                                (struct sockaddr_in *)args->address;
 425                struct sockaddr_in6 *sin6 =
 426                                (struct sockaddr_in6 *)args->address;
 427
 428                servername[0] = '\0';
 429                switch (args->address->sa_family) {
 430                case AF_LOCAL:
 431                        snprintf(servername, sizeof(servername), "%s",
 432                                 sun->sun_path);
 433                        break;
 434                case AF_INET:
 435                        snprintf(servername, sizeof(servername), "%pI4",
 436                                 &sin->sin_addr.s_addr);
 437                        break;
 438                case AF_INET6:
 439                        snprintf(servername, sizeof(servername), "%pI6",
 440                                 &sin6->sin6_addr);
 441                        break;
 442                default:
 443                        /* caller wants default server name, but
 444                         * address family isn't recognized. */
 445                        return ERR_PTR(-EINVAL);
 446                }
 447                xprtargs.servername = servername;
 448        }
 449
 450        xprt = xprt_create_transport(&xprtargs);
 451        if (IS_ERR(xprt))
 452                return (struct rpc_clnt *)xprt;
 453
 454        /*
 455         * By default, kernel RPC client connects from a reserved port.
 456         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
 457         * but it is always enabled for rpciod, which handles the connect
 458         * operation.
 459         */
 460        xprt->resvport = 1;
 461        if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 462                xprt->resvport = 0;
 463
 464        clnt = rpc_new_client(args, xprt);
 465        if (IS_ERR(clnt))
 466                return clnt;
 467
 468        if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
 469                int err = rpc_ping(clnt);
 470                if (err != 0) {
 471                        rpc_shutdown_client(clnt);
 472                        return ERR_PTR(err);
 473                }
 474        }
 475
 476        clnt->cl_softrtry = 1;
 477        if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
 478                clnt->cl_softrtry = 0;
 479
 480        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
 481                clnt->cl_autobind = 1;
 482        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
 483                clnt->cl_discrtry = 1;
 484        if (!(args->flags & RPC_CLNT_CREATE_QUIET))
 485                clnt->cl_chatty = 1;
 486
 487        return clnt;
 488}
 489EXPORT_SYMBOL_GPL(rpc_create);
 490
 491/*
 492 * This function clones the RPC client structure. It allows us to share the
 493 * same transport while varying parameters such as the authentication
 494 * flavour.
 495 */
 496static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
 497                                           struct rpc_clnt *clnt)
 498{
 499        struct rpc_xprt *xprt;
 500        struct rpc_clnt *new;
 501        int err;
 502
 503        err = -ENOMEM;
 504        rcu_read_lock();
 505        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
 506        rcu_read_unlock();
 507        if (xprt == NULL)
 508                goto out_err;
 509        args->servername = xprt->servername;
 510
 511        new = rpc_new_client(args, xprt);
 512        if (IS_ERR(new)) {
 513                err = PTR_ERR(new);
 514                goto out_put;
 515        }
 516
 517        atomic_inc(&clnt->cl_count);
 518        new->cl_parent = clnt;
 519
 520        /* Turn off autobind on clones */
 521        new->cl_autobind = 0;
 522        new->cl_softrtry = clnt->cl_softrtry;
 523        new->cl_discrtry = clnt->cl_discrtry;
 524        new->cl_chatty = clnt->cl_chatty;
 525        return new;
 526
 527out_put:
 528        xprt_put(xprt);
 529out_err:
 530        dprintk("RPC:       %s: returned error %d\n", __func__, err);
 531        return ERR_PTR(err);
 532}
 533
 534/**
 535 * rpc_clone_client - Clone an RPC client structure
 536 *
 537 * @clnt: RPC client whose parameters are copied
 538 *
 539 * Returns a fresh RPC client or an ERR_PTR.
 540 */
 541struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
 542{
 543        struct rpc_create_args args = {
 544                .program        = clnt->cl_program,
 545                .prognumber     = clnt->cl_prog,
 546                .version        = clnt->cl_vers,
 547                .authflavor     = clnt->cl_auth->au_flavor,
 548                .client_name    = clnt->cl_principal,
 549        };
 550        return __rpc_clone_client(&args, clnt);
 551}
 552EXPORT_SYMBOL_GPL(rpc_clone_client);
 553
 554/**
 555 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
 556 *
 557 * @clnt: RPC client whose parameters are copied
 558 * @flavor: security flavor for new client
 559 *
 560 * Returns a fresh RPC client or an ERR_PTR.
 561 */
 562struct rpc_clnt *
 563rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
 564{
 565        struct rpc_create_args args = {
 566                .program        = clnt->cl_program,
 567                .prognumber     = clnt->cl_prog,
 568                .version        = clnt->cl_vers,
 569                .authflavor     = flavor,
 570                .client_name    = clnt->cl_principal,
 571        };
 572        return __rpc_clone_client(&args, clnt);
 573}
 574EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
 575
 576/*
 577 * Kill all tasks for the given client.
 578 * XXX: kill their descendants as well?
 579 */
 580void rpc_killall_tasks(struct rpc_clnt *clnt)
 581{
 582        struct rpc_task *rovr;
 583
 584
 585        if (list_empty(&clnt->cl_tasks))
 586                return;
 587        dprintk("RPC:       killing all tasks for client %p\n", clnt);
 588        /*
 589         * Spin lock all_tasks to prevent changes...
 590         */
 591        spin_lock(&clnt->cl_lock);
 592        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
 593                if (!RPC_IS_ACTIVATED(rovr))
 594                        continue;
 595                if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
 596                        rovr->tk_flags |= RPC_TASK_KILLED;
 597                        rpc_exit(rovr, -EIO);
 598                        if (RPC_IS_QUEUED(rovr))
 599                                rpc_wake_up_queued_task(rovr->tk_waitqueue,
 600                                                        rovr);
 601                }
 602        }
 603        spin_unlock(&clnt->cl_lock);
 604}
 605EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 606
 607/*
 608 * Properly shut down an RPC client, terminating all outstanding
 609 * requests.
 610 */
 611void rpc_shutdown_client(struct rpc_clnt *clnt)
 612{
 613        might_sleep();
 614
 615        dprintk_rcu("RPC:       shutting down %s client for %s\n",
 616                        clnt->cl_protname,
 617                        rcu_dereference(clnt->cl_xprt)->servername);
 618
 619        while (!list_empty(&clnt->cl_tasks)) {
 620                rpc_killall_tasks(clnt);
 621                wait_event_timeout(destroy_wait,
 622                        list_empty(&clnt->cl_tasks), 1*HZ);
 623        }
 624
 625        rpc_release_client(clnt);
 626}
 627EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 628
 629/*
 630 * Free an RPC client
 631 */
 632static void
 633rpc_free_client(struct rpc_clnt *clnt)
 634{
 635        dprintk_rcu("RPC:       destroying %s client for %s\n",
 636                        clnt->cl_protname,
 637                        rcu_dereference(clnt->cl_xprt)->servername);
 638        if (clnt->cl_parent != clnt)
 639                rpc_release_client(clnt->cl_parent);
 640        rpc_unregister_client(clnt);
 641        rpc_clnt_remove_pipedir(clnt);
 642        rpc_free_iostats(clnt->cl_metrics);
 643        kfree(clnt->cl_principal);
 644        clnt->cl_metrics = NULL;
 645        xprt_put(rcu_dereference_raw(clnt->cl_xprt));
 646        rpciod_down();
 647        kfree(clnt);
 648}
 649
 650/*
 651 * Free an RPC client
 652 */
 653static void
 654rpc_free_auth(struct rpc_clnt *clnt)
 655{
 656        if (clnt->cl_auth == NULL) {
 657                rpc_free_client(clnt);
 658                return;
 659        }
 660
 661        /*
 662         * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
 663         *       release remaining GSS contexts. This mechanism ensures
 664         *       that it can do so safely.
 665         */
 666        atomic_inc(&clnt->cl_count);
 667        rpcauth_release(clnt->cl_auth);
 668        clnt->cl_auth = NULL;
 669        if (atomic_dec_and_test(&clnt->cl_count))
 670                rpc_free_client(clnt);
 671}
 672
 673/*
 674 * Release reference to the RPC client
 675 */
 676void
 677rpc_release_client(struct rpc_clnt *clnt)
 678{
 679        dprintk("RPC:       rpc_release_client(%p)\n", clnt);
 680
 681        if (list_empty(&clnt->cl_tasks))
 682                wake_up(&destroy_wait);
 683        if (atomic_dec_and_test(&clnt->cl_count))
 684                rpc_free_auth(clnt);
 685}
 686
 687/**
 688 * rpc_bind_new_program - bind a new RPC program to an existing client
 689 * @old: old rpc_client
 690 * @program: rpc program to set
 691 * @vers: rpc program version
 692 *
 693 * Clones the rpc client and sets up a new RPC program. This is mainly
 694 * of use for enabling different RPC programs to share the same transport.
 695 * The Sun NFSv2/v3 ACL protocol can do this.
 696 */
 697struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 698                                      const struct rpc_program *program,
 699                                      u32 vers)
 700{
 701        struct rpc_create_args args = {
 702                .program        = program,
 703                .prognumber     = program->number,
 704                .version        = vers,
 705                .authflavor     = old->cl_auth->au_flavor,
 706                .client_name    = old->cl_principal,
 707        };
 708        struct rpc_clnt *clnt;
 709        int err;
 710
 711        clnt = __rpc_clone_client(&args, old);
 712        if (IS_ERR(clnt))
 713                goto out;
 714        err = rpc_ping(clnt);
 715        if (err != 0) {
 716                rpc_shutdown_client(clnt);
 717                clnt = ERR_PTR(err);
 718        }
 719out:
 720        return clnt;
 721}
 722EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 723
 724void rpc_task_release_client(struct rpc_task *task)
 725{
 726        struct rpc_clnt *clnt = task->tk_client;
 727
 728        if (clnt != NULL) {
 729                /* Remove from client task list */
 730                spin_lock(&clnt->cl_lock);
 731                list_del(&task->tk_task);
 732                spin_unlock(&clnt->cl_lock);
 733                task->tk_client = NULL;
 734
 735                rpc_release_client(clnt);
 736        }
 737}
 738
 739static
 740void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 741{
 742        if (clnt != NULL) {
 743                rpc_task_release_client(task);
 744                task->tk_client = clnt;
 745                atomic_inc(&clnt->cl_count);
 746                if (clnt->cl_softrtry)
 747                        task->tk_flags |= RPC_TASK_SOFT;
 748                if (sk_memalloc_socks()) {
 749                        struct rpc_xprt *xprt;
 750
 751                        rcu_read_lock();
 752                        xprt = rcu_dereference(clnt->cl_xprt);
 753                        if (xprt->swapper)
 754                                task->tk_flags |= RPC_TASK_SWAPPER;
 755                        rcu_read_unlock();
 756                }
 757                /* Add to the client's list of all tasks */
 758                spin_lock(&clnt->cl_lock);
 759                list_add_tail(&task->tk_task, &clnt->cl_tasks);
 760                spin_unlock(&clnt->cl_lock);
 761        }
 762}
 763
 764void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
 765{
 766        rpc_task_release_client(task);
 767        rpc_task_set_client(task, clnt);
 768}
 769EXPORT_SYMBOL_GPL(rpc_task_reset_client);
 770
 771
 772static void
 773rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
 774{
 775        if (msg != NULL) {
 776                task->tk_msg.rpc_proc = msg->rpc_proc;
 777                task->tk_msg.rpc_argp = msg->rpc_argp;
 778                task->tk_msg.rpc_resp = msg->rpc_resp;
 779                if (msg->rpc_cred != NULL)
 780                        task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
 781        }
 782}
 783
 784/*
 785 * Default callback for async RPC calls
 786 */
 787static void
 788rpc_default_callback(struct rpc_task *task, void *data)
 789{
 790}
 791
 792static const struct rpc_call_ops rpc_default_ops = {
 793        .rpc_call_done = rpc_default_callback,
 794};
 795
 796/**
 797 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
 798 * @task_setup_data: pointer to task initialisation data
 799 */
 800struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
 801{
 802        struct rpc_task *task;
 803
 804        task = rpc_new_task(task_setup_data);
 805        if (IS_ERR(task))
 806                goto out;
 807
 808        rpc_task_set_client(task, task_setup_data->rpc_client);
 809        rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
 810
 811        if (task->tk_action == NULL)
 812                rpc_call_start(task);
 813
 814        atomic_inc(&task->tk_count);
 815        rpc_execute(task);
 816out:
 817        return task;
 818}
 819EXPORT_SYMBOL_GPL(rpc_run_task);
 820
 821/**
 822 * rpc_call_sync - Perform a synchronous RPC call
 823 * @clnt: pointer to RPC client
 824 * @msg: RPC call parameters
 825 * @flags: RPC call flags
 826 */
 827int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
 828{
 829        struct rpc_task *task;
 830        struct rpc_task_setup task_setup_data = {
 831                .rpc_client = clnt,
 832                .rpc_message = msg,
 833                .callback_ops = &rpc_default_ops,
 834                .flags = flags,
 835        };
 836        int status;
 837
 838        WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
 839        if (flags & RPC_TASK_ASYNC) {
 840                rpc_release_calldata(task_setup_data.callback_ops,
 841                        task_setup_data.callback_data);
 842                return -EINVAL;
 843        }
 844
 845        task = rpc_run_task(&task_setup_data);
 846        if (IS_ERR(task))
 847                return PTR_ERR(task);
 848        status = task->tk_status;
 849        rpc_put_task(task);
 850        return status;
 851}
 852EXPORT_SYMBOL_GPL(rpc_call_sync);
 853
 854/**
 855 * rpc_call_async - Perform an asynchronous RPC call
 856 * @clnt: pointer to RPC client
 857 * @msg: RPC call parameters
 858 * @flags: RPC call flags
 859 * @tk_ops: RPC call ops
 860 * @data: user call data
 861 */
 862int
 863rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
 864               const struct rpc_call_ops *tk_ops, void *data)
 865{
 866        struct rpc_task *task;
 867        struct rpc_task_setup task_setup_data = {
 868                .rpc_client = clnt,
 869                .rpc_message = msg,
 870                .callback_ops = tk_ops,
 871                .callback_data = data,
 872                .flags = flags|RPC_TASK_ASYNC,
 873        };
 874
 875        task = rpc_run_task(&task_setup_data);
 876        if (IS_ERR(task))
 877                return PTR_ERR(task);
 878        rpc_put_task(task);
 879        return 0;
 880}
 881EXPORT_SYMBOL_GPL(rpc_call_async);
 882
 883#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 884/**
 885 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
 886 * rpc_execute against it
 887 * @req: RPC request
 888 * @tk_ops: RPC call ops
 889 */
 890struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
 891                                const struct rpc_call_ops *tk_ops)
 892{
 893        struct rpc_task *task;
 894        struct xdr_buf *xbufp = &req->rq_snd_buf;
 895        struct rpc_task_setup task_setup_data = {
 896                .callback_ops = tk_ops,
 897        };
 898
 899        dprintk("RPC: rpc_run_bc_task req= %p\n", req);
 900        /*
 901         * Create an rpc_task to send the data
 902         */
 903        task = rpc_new_task(&task_setup_data);
 904        if (IS_ERR(task)) {
 905                xprt_free_bc_request(req);
 906                goto out;
 907        }
 908        task->tk_rqstp = req;
 909
 910        /*
 911         * Set up the xdr_buf length.
 912         * This also indicates that the buffer is XDR encoded already.
 913         */
 914        xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
 915                        xbufp->tail[0].iov_len;
 916
 917        task->tk_action = call_bc_transmit;
 918        atomic_inc(&task->tk_count);
 919        WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
 920        rpc_execute(task);
 921
 922out:
 923        dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
 924        return task;
 925}
 926#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 927
 928void
 929rpc_call_start(struct rpc_task *task)
 930{
 931        task->tk_action = call_start;
 932}
 933EXPORT_SYMBOL_GPL(rpc_call_start);
 934
 935/**
 936 * rpc_peeraddr - extract remote peer address from clnt's xprt
 937 * @clnt: RPC client structure
 938 * @buf: target buffer
 939 * @bufsize: length of target buffer
 940 *
 941 * Returns the number of bytes that are actually in the stored address.
 942 */
 943size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
 944{
 945        size_t bytes;
 946        struct rpc_xprt *xprt;
 947
 948        rcu_read_lock();
 949        xprt = rcu_dereference(clnt->cl_xprt);
 950
 951        bytes = xprt->addrlen;
 952        if (bytes > bufsize)
 953                bytes = bufsize;
 954        memcpy(buf, &xprt->addr, bytes);
 955        rcu_read_unlock();
 956
 957        return bytes;
 958}
 959EXPORT_SYMBOL_GPL(rpc_peeraddr);
 960
 961/**
 962 * rpc_peeraddr2str - return remote peer address in printable format
 963 * @clnt: RPC client structure
 964 * @format: address format
 965 *
 966 * NB: the lifetime of the memory referenced by the returned pointer is
 967 * the same as the rpc_xprt itself.  As long as the caller uses this
 968 * pointer, it must hold the RCU read lock.
 969 */
 970const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
 971                             enum rpc_display_format_t format)
 972{
 973        struct rpc_xprt *xprt;
 974
 975        xprt = rcu_dereference(clnt->cl_xprt);
 976
 977        if (xprt->address_strings[format] != NULL)
 978                return xprt->address_strings[format];
 979        else
 980                return "unprintable";
 981}
 982EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
 983
 984static const struct sockaddr_in rpc_inaddr_loopback = {
 985        .sin_family             = AF_INET,
 986        .sin_addr.s_addr        = htonl(INADDR_ANY),
 987};
 988
 989static const struct sockaddr_in6 rpc_in6addr_loopback = {
 990        .sin6_family            = AF_INET6,
 991        .sin6_addr              = IN6ADDR_ANY_INIT,
 992};
 993
 994/*
 995 * Try a getsockname() on a connected datagram socket.  Using a
 996 * connected datagram socket prevents leaving a socket in TIME_WAIT.
 997 * This conserves the ephemeral port number space.
 998 *
 999 * Returns zero and fills in "buf" if successful; otherwise, a
1000 * negative errno is returned.
1001 */
1002static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1003                        struct sockaddr *buf, int buflen)
1004{
1005        struct socket *sock;
1006        int err;
1007
1008        err = __sock_create(net, sap->sa_family,
1009                                SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1010        if (err < 0) {
1011                dprintk("RPC:       can't create UDP socket (%d)\n", err);
1012                goto out;
1013        }
1014
1015        switch (sap->sa_family) {
1016        case AF_INET:
1017                err = kernel_bind(sock,
1018                                (struct sockaddr *)&rpc_inaddr_loopback,
1019                                sizeof(rpc_inaddr_loopback));
1020                break;
1021        case AF_INET6:
1022                err = kernel_bind(sock,
1023                                (struct sockaddr *)&rpc_in6addr_loopback,
1024                                sizeof(rpc_in6addr_loopback));
1025                break;
1026        default:
1027                err = -EAFNOSUPPORT;
1028                goto out;
1029        }
1030        if (err < 0) {
1031                dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1032                goto out_release;
1033        }
1034
1035        err = kernel_connect(sock, sap, salen, 0);
1036        if (err < 0) {
1037                dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1038                goto out_release;
1039        }
1040
1041        err = kernel_getsockname(sock, buf, &buflen);
1042        if (err < 0) {
1043                dprintk("RPC:       getsockname failed (%d)\n", err);
1044                goto out_release;
1045        }
1046
1047        err = 0;
1048        if (buf->sa_family == AF_INET6) {
1049                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1050                sin6->sin6_scope_id = 0;
1051        }
1052        dprintk("RPC:       %s succeeded\n", __func__);
1053
1054out_release:
1055        sock_release(sock);
1056out:
1057        return err;
1058}
1059
1060/*
1061 * Scraping a connected socket failed, so we don't have a useable
1062 * local address.  Fallback: generate an address that will prevent
1063 * the server from calling us back.
1064 *
1065 * Returns zero and fills in "buf" if successful; otherwise, a
1066 * negative errno is returned.
1067 */
1068static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1069{
1070        switch (family) {
1071        case AF_INET:
1072                if (buflen < sizeof(rpc_inaddr_loopback))
1073                        return -EINVAL;
1074                memcpy(buf, &rpc_inaddr_loopback,
1075                                sizeof(rpc_inaddr_loopback));
1076                break;
1077        case AF_INET6:
1078                if (buflen < sizeof(rpc_in6addr_loopback))
1079                        return -EINVAL;
1080                memcpy(buf, &rpc_in6addr_loopback,
1081                                sizeof(rpc_in6addr_loopback));
1082        default:
1083                dprintk("RPC:       %s: address family not supported\n",
1084                        __func__);
1085                return -EAFNOSUPPORT;
1086        }
1087        dprintk("RPC:       %s: succeeded\n", __func__);
1088        return 0;
1089}
1090
1091/**
1092 * rpc_localaddr - discover local endpoint address for an RPC client
1093 * @clnt: RPC client structure
1094 * @buf: target buffer
1095 * @buflen: size of target buffer, in bytes
1096 *
1097 * Returns zero and fills in "buf" and "buflen" if successful;
1098 * otherwise, a negative errno is returned.
1099 *
1100 * This works even if the underlying transport is not currently connected,
1101 * or if the upper layer never previously provided a source address.
1102 *
1103 * The result of this function call is transient: multiple calls in
1104 * succession may give different results, depending on how local
1105 * networking configuration changes over time.
1106 */
1107int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1108{
1109        struct sockaddr_storage address;
1110        struct sockaddr *sap = (struct sockaddr *)&address;
1111        struct rpc_xprt *xprt;
1112        struct net *net;
1113        size_t salen;
1114        int err;
1115
1116        rcu_read_lock();
1117        xprt = rcu_dereference(clnt->cl_xprt);
1118        salen = xprt->addrlen;
1119        memcpy(sap, &xprt->addr, salen);
1120        net = get_net(xprt->xprt_net);
1121        rcu_read_unlock();
1122
1123        rpc_set_port(sap, 0);
1124        err = rpc_sockname(net, sap, salen, buf, buflen);
1125        put_net(net);
1126        if (err != 0)
1127                /* Couldn't discover local address, return ANYADDR */
1128                return rpc_anyaddr(sap->sa_family, buf, buflen);
1129        return 0;
1130}
1131EXPORT_SYMBOL_GPL(rpc_localaddr);
1132
1133void
1134rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1135{
1136        struct rpc_xprt *xprt;
1137
1138        rcu_read_lock();
1139        xprt = rcu_dereference(clnt->cl_xprt);
1140        if (xprt->ops->set_buffer_size)
1141                xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1142        rcu_read_unlock();
1143}
1144EXPORT_SYMBOL_GPL(rpc_setbufsize);
1145
1146/**
1147 * rpc_protocol - Get transport protocol number for an RPC client
1148 * @clnt: RPC client to query
1149 *
1150 */
1151int rpc_protocol(struct rpc_clnt *clnt)
1152{
1153        int protocol;
1154
1155        rcu_read_lock();
1156        protocol = rcu_dereference(clnt->cl_xprt)->prot;
1157        rcu_read_unlock();
1158        return protocol;
1159}
1160EXPORT_SYMBOL_GPL(rpc_protocol);
1161
1162/**
1163 * rpc_net_ns - Get the network namespace for this RPC client
1164 * @clnt: RPC client to query
1165 *
1166 */
1167struct net *rpc_net_ns(struct rpc_clnt *clnt)
1168{
1169        struct net *ret;
1170
1171        rcu_read_lock();
1172        ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1173        rcu_read_unlock();
1174        return ret;
1175}
1176EXPORT_SYMBOL_GPL(rpc_net_ns);
1177
1178/**
1179 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1180 * @clnt: RPC client to query
1181 *
1182 * For stream transports, this is one RPC record fragment (see RFC
1183 * 1831), as we don't support multi-record requests yet.  For datagram
1184 * transports, this is the size of an IP packet minus the IP, UDP, and
1185 * RPC header sizes.
1186 */
1187size_t rpc_max_payload(struct rpc_clnt *clnt)
1188{
1189        size_t ret;
1190
1191        rcu_read_lock();
1192        ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1193        rcu_read_unlock();
1194        return ret;
1195}
1196EXPORT_SYMBOL_GPL(rpc_max_payload);
1197
1198/**
1199 * rpc_force_rebind - force transport to check that remote port is unchanged
1200 * @clnt: client to rebind
1201 *
1202 */
1203void rpc_force_rebind(struct rpc_clnt *clnt)
1204{
1205        if (clnt->cl_autobind) {
1206                rcu_read_lock();
1207                xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1208                rcu_read_unlock();
1209        }
1210}
1211EXPORT_SYMBOL_GPL(rpc_force_rebind);
1212
1213/*
1214 * Restart an (async) RPC call from the call_prepare state.
1215 * Usually called from within the exit handler.
1216 */
1217int
1218rpc_restart_call_prepare(struct rpc_task *task)
1219{
1220        if (RPC_ASSASSINATED(task))
1221                return 0;
1222        task->tk_action = call_start;
1223        if (task->tk_ops->rpc_call_prepare != NULL)
1224                task->tk_action = rpc_prepare_task;
1225        return 1;
1226}
1227EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1228
1229/*
1230 * Restart an (async) RPC call. Usually called from within the
1231 * exit handler.
1232 */
1233int
1234rpc_restart_call(struct rpc_task *task)
1235{
1236        if (RPC_ASSASSINATED(task))
1237                return 0;
1238        task->tk_action = call_start;
1239        return 1;
1240}
1241EXPORT_SYMBOL_GPL(rpc_restart_call);
1242
1243#ifdef RPC_DEBUG
1244static const char *rpc_proc_name(const struct rpc_task *task)
1245{
1246        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1247
1248        if (proc) {
1249                if (proc->p_name)
1250                        return proc->p_name;
1251                else
1252                        return "NULL";
1253        } else
1254                return "no proc";
1255}
1256#endif
1257
1258/*
1259 * 0.  Initial state
1260 *
1261 *     Other FSM states can be visited zero or more times, but
1262 *     this state is visited exactly once for each RPC.
1263 */
1264static void
1265call_start(struct rpc_task *task)
1266{
1267        struct rpc_clnt *clnt = task->tk_client;
1268
1269        dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1270                        clnt->cl_protname, clnt->cl_vers,
1271                        rpc_proc_name(task),
1272                        (RPC_IS_ASYNC(task) ? "async" : "sync"));
1273
1274        /* Increment call count */
1275        task->tk_msg.rpc_proc->p_count++;
1276        clnt->cl_stats->rpccnt++;
1277        task->tk_action = call_reserve;
1278}
1279
1280/*
1281 * 1.   Reserve an RPC call slot
1282 */
1283static void
1284call_reserve(struct rpc_task *task)
1285{
1286        dprint_status(task);
1287
1288        task->tk_status  = 0;
1289        task->tk_action  = call_reserveresult;
1290        xprt_reserve(task);
1291}
1292
1293/*
1294 * 1b.  Grok the result of xprt_reserve()
1295 */
1296static void
1297call_reserveresult(struct rpc_task *task)
1298{
1299        int status = task->tk_status;
1300
1301        dprint_status(task);
1302
1303        /*
1304         * After a call to xprt_reserve(), we must have either
1305         * a request slot or else an error status.
1306         */
1307        task->tk_status = 0;
1308        if (status >= 0) {
1309                if (task->tk_rqstp) {
1310                        task->tk_action = call_refresh;
1311                        return;
1312                }
1313
1314                printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1315                                __func__, status);
1316                rpc_exit(task, -EIO);
1317                return;
1318        }
1319
1320        /*
1321         * Even though there was an error, we may have acquired
1322         * a request slot somehow.  Make sure not to leak it.
1323         */
1324        if (task->tk_rqstp) {
1325                printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1326                                __func__, status);
1327                xprt_release(task);
1328        }
1329
1330        switch (status) {
1331        case -ENOMEM:
1332                rpc_delay(task, HZ >> 2);
1333        case -EAGAIN:   /* woken up; retry */
1334                task->tk_action = call_reserve;
1335                return;
1336        case -EIO:      /* probably a shutdown */
1337                break;
1338        default:
1339                printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1340                                __func__, status);
1341                break;
1342        }
1343        rpc_exit(task, status);
1344}
1345
1346/*
1347 * 2.   Bind and/or refresh the credentials
1348 */
1349static void
1350call_refresh(struct rpc_task *task)
1351{
1352        dprint_status(task);
1353
1354        task->tk_action = call_refreshresult;
1355        task->tk_status = 0;
1356        task->tk_client->cl_stats->rpcauthrefresh++;
1357        rpcauth_refreshcred(task);
1358}
1359
1360/*
1361 * 2a.  Process the results of a credential refresh
1362 */
1363static void
1364call_refreshresult(struct rpc_task *task)
1365{
1366        int status = task->tk_status;
1367
1368        dprint_status(task);
1369
1370        task->tk_status = 0;
1371        task->tk_action = call_refresh;
1372        switch (status) {
1373        case 0:
1374                if (rpcauth_uptodatecred(task))
1375                        task->tk_action = call_allocate;
1376                return;
1377        case -ETIMEDOUT:
1378                rpc_delay(task, 3*HZ);
1379        case -EKEYEXPIRED:
1380        case -EAGAIN:
1381                status = -EACCES;
1382                if (!task->tk_cred_retry)
1383                        break;
1384                task->tk_cred_retry--;
1385                dprintk("RPC: %5u %s: retry refresh creds\n",
1386                                task->tk_pid, __func__);
1387                return;
1388        }
1389        dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1390                                task->tk_pid, __func__, status);
1391        rpc_exit(task, status);
1392}
1393
1394/*
1395 * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1396 *      (Note: buffer memory is freed in xprt_release).
1397 */
1398static void
1399call_allocate(struct rpc_task *task)
1400{
1401        unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1402        struct rpc_rqst *req = task->tk_rqstp;
1403        struct rpc_xprt *xprt = task->tk_xprt;
1404        struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1405
1406        dprint_status(task);
1407
1408        task->tk_status = 0;
1409        task->tk_action = call_bind;
1410
1411        if (req->rq_buffer)
1412                return;
1413
1414        if (proc->p_proc != 0) {
1415                BUG_ON(proc->p_arglen == 0);
1416                if (proc->p_decode != NULL)
1417                        BUG_ON(proc->p_replen == 0);
1418        }
1419
1420        /*
1421         * Calculate the size (in quads) of the RPC call
1422         * and reply headers, and convert both values
1423         * to byte sizes.
1424         */
1425        req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1426        req->rq_callsize <<= 2;
1427        req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1428        req->rq_rcvsize <<= 2;
1429
1430        req->rq_buffer = xprt->ops->buf_alloc(task,
1431                                        req->rq_callsize + req->rq_rcvsize);
1432        if (req->rq_buffer != NULL)
1433                return;
1434
1435        dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1436
1437        if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1438                task->tk_action = call_allocate;
1439                rpc_delay(task, HZ>>4);
1440                return;
1441        }
1442
1443        rpc_exit(task, -ERESTARTSYS);
1444}
1445
1446static inline int
1447rpc_task_need_encode(struct rpc_task *task)
1448{
1449        return task->tk_rqstp->rq_snd_buf.len == 0;
1450}
1451
1452static inline void
1453rpc_task_force_reencode(struct rpc_task *task)
1454{
1455        task->tk_rqstp->rq_snd_buf.len = 0;
1456        task->tk_rqstp->rq_bytes_sent = 0;
1457}
1458
1459static inline void
1460rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1461{
1462        buf->head[0].iov_base = start;
1463        buf->head[0].iov_len = len;
1464        buf->tail[0].iov_len = 0;
1465        buf->page_len = 0;
1466        buf->flags = 0;
1467        buf->len = 0;
1468        buf->buflen = len;
1469}
1470
1471/*
1472 * 3.   Encode arguments of an RPC call
1473 */
1474static void
1475rpc_xdr_encode(struct rpc_task *task)
1476{
1477        struct rpc_rqst *req = task->tk_rqstp;
1478        kxdreproc_t     encode;
1479        __be32          *p;
1480
1481        dprint_status(task);
1482
1483        rpc_xdr_buf_init(&req->rq_snd_buf,
1484                         req->rq_buffer,
1485                         req->rq_callsize);
1486        rpc_xdr_buf_init(&req->rq_rcv_buf,
1487                         (char *)req->rq_buffer + req->rq_callsize,
1488                         req->rq_rcvsize);
1489
1490        p = rpc_encode_header(task);
1491        if (p == NULL) {
1492                printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1493                rpc_exit(task, -EIO);
1494                return;
1495        }
1496
1497        encode = task->tk_msg.rpc_proc->p_encode;
1498        if (encode == NULL)
1499                return;
1500
1501        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1502                        task->tk_msg.rpc_argp);
1503}
1504
1505/*
1506 * 4.   Get the server port number if not yet set
1507 */
1508static void
1509call_bind(struct rpc_task *task)
1510{
1511        struct rpc_xprt *xprt = task->tk_xprt;
1512
1513        dprint_status(task);
1514
1515        task->tk_action = call_connect;
1516        if (!xprt_bound(xprt)) {
1517                task->tk_action = call_bind_status;
1518                task->tk_timeout = xprt->bind_timeout;
1519                xprt->ops->rpcbind(task);
1520        }
1521}
1522
1523/*
1524 * 4a.  Sort out bind result
1525 */
1526static void
1527call_bind_status(struct rpc_task *task)
1528{
1529        int status = -EIO;
1530
1531        if (task->tk_status >= 0) {
1532                dprint_status(task);
1533                task->tk_status = 0;
1534                task->tk_action = call_connect;
1535                return;
1536        }
1537
1538        trace_rpc_bind_status(task);
1539        switch (task->tk_status) {
1540        case -ENOMEM:
1541                dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1542                rpc_delay(task, HZ >> 2);
1543                goto retry_timeout;
1544        case -EACCES:
1545                dprintk("RPC: %5u remote rpcbind: RPC program/version "
1546                                "unavailable\n", task->tk_pid);
1547                /* fail immediately if this is an RPC ping */
1548                if (task->tk_msg.rpc_proc->p_proc == 0) {
1549                        status = -EOPNOTSUPP;
1550                        break;
1551                }
1552                if (task->tk_rebind_retry == 0)
1553                        break;
1554                task->tk_rebind_retry--;
1555                rpc_delay(task, 3*HZ);
1556                goto retry_timeout;
1557        case -ETIMEDOUT:
1558                dprintk("RPC: %5u rpcbind request timed out\n",
1559                                task->tk_pid);
1560                goto retry_timeout;
1561        case -EPFNOSUPPORT:
1562                /* server doesn't support any rpcbind version we know of */
1563                dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1564                                task->tk_pid);
1565                break;
1566        case -EPROTONOSUPPORT:
1567                dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1568                                task->tk_pid);
1569                task->tk_status = 0;
1570                task->tk_action = call_bind;
1571                return;
1572        case -ECONNREFUSED:             /* connection problems */
1573        case -ECONNRESET:
1574        case -ENOTCONN:
1575        case -EHOSTDOWN:
1576        case -EHOSTUNREACH:
1577        case -ENETUNREACH:
1578        case -EPIPE:
1579                dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1580                                task->tk_pid, task->tk_status);
1581                if (!RPC_IS_SOFTCONN(task)) {
1582                        rpc_delay(task, 5*HZ);
1583                        goto retry_timeout;
1584                }
1585                status = task->tk_status;
1586                break;
1587        default:
1588                dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1589                                task->tk_pid, -task->tk_status);
1590        }
1591
1592        rpc_exit(task, status);
1593        return;
1594
1595retry_timeout:
1596        task->tk_action = call_timeout;
1597}
1598
1599/*
1600 * 4b.  Connect to the RPC server
1601 */
1602static void
1603call_connect(struct rpc_task *task)
1604{
1605        struct rpc_xprt *xprt = task->tk_xprt;
1606
1607        dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1608                        task->tk_pid, xprt,
1609                        (xprt_connected(xprt) ? "is" : "is not"));
1610
1611        task->tk_action = call_transmit;
1612        if (!xprt_connected(xprt)) {
1613                task->tk_action = call_connect_status;
1614                if (task->tk_status < 0)
1615                        return;
1616                xprt_connect(task);
1617        }
1618}
1619
1620/*
1621 * 4c.  Sort out connect result
1622 */
1623static void
1624call_connect_status(struct rpc_task *task)
1625{
1626        struct rpc_clnt *clnt = task->tk_client;
1627        int status = task->tk_status;
1628
1629        dprint_status(task);
1630
1631        task->tk_status = 0;
1632        if (status >= 0 || status == -EAGAIN) {
1633                clnt->cl_stats->netreconn++;
1634                task->tk_action = call_transmit;
1635                return;
1636        }
1637
1638        trace_rpc_connect_status(task, status);
1639        switch (status) {
1640                /* if soft mounted, test if we've timed out */
1641        case -ETIMEDOUT:
1642                task->tk_action = call_timeout;
1643                break;
1644        default:
1645                rpc_exit(task, -EIO);
1646        }
1647}
1648
1649/*
1650 * 5.   Transmit the RPC request, and wait for reply
1651 */
1652static void
1653call_transmit(struct rpc_task *task)
1654{
1655        dprint_status(task);
1656
1657        task->tk_action = call_status;
1658        if (task->tk_status < 0)
1659                return;
1660        task->tk_status = xprt_prepare_transmit(task);
1661        if (task->tk_status != 0)
1662                return;
1663        task->tk_action = call_transmit_status;
1664        /* Encode here so that rpcsec_gss can use correct sequence number. */
1665        if (rpc_task_need_encode(task)) {
1666                rpc_xdr_encode(task);
1667                /* Did the encode result in an error condition? */
1668                if (task->tk_status != 0) {
1669                        /* Was the error nonfatal? */
1670                        if (task->tk_status == -EAGAIN)
1671                                rpc_delay(task, HZ >> 4);
1672                        else
1673                                rpc_exit(task, task->tk_status);
1674                        return;
1675                }
1676        }
1677        xprt_transmit(task);
1678        if (task->tk_status < 0)
1679                return;
1680        /*
1681         * On success, ensure that we call xprt_end_transmit() before sleeping
1682         * in order to allow access to the socket to other RPC requests.
1683         */
1684        call_transmit_status(task);
1685        if (rpc_reply_expected(task))
1686                return;
1687        task->tk_action = rpc_exit_task;
1688        rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
1689}
1690
1691/*
1692 * 5a.  Handle cleanup after a transmission
1693 */
1694static void
1695call_transmit_status(struct rpc_task *task)
1696{
1697        task->tk_action = call_status;
1698
1699        /*
1700         * Common case: success.  Force the compiler to put this
1701         * test first.
1702         */
1703        if (task->tk_status == 0) {
1704                xprt_end_transmit(task);
1705                rpc_task_force_reencode(task);
1706                return;
1707        }
1708
1709        switch (task->tk_status) {
1710        case -EAGAIN:
1711                break;
1712        default:
1713                dprint_status(task);
1714                xprt_end_transmit(task);
1715                rpc_task_force_reencode(task);
1716                break;
1717                /*
1718                 * Special cases: if we've been waiting on the
1719                 * socket's write_space() callback, or if the
1720                 * socket just returned a connection error,
1721                 * then hold onto the transport lock.
1722                 */
1723        case -ECONNREFUSED:
1724        case -EHOSTDOWN:
1725        case -EHOSTUNREACH:
1726        case -ENETUNREACH:
1727                if (RPC_IS_SOFTCONN(task)) {
1728                        xprt_end_transmit(task);
1729                        rpc_exit(task, task->tk_status);
1730                        break;
1731                }
1732        case -ECONNRESET:
1733        case -ENOTCONN:
1734        case -EPIPE:
1735                rpc_task_force_reencode(task);
1736        }
1737}
1738
1739#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1740/*
1741 * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
1742 * addition, disconnect on connectivity errors.
1743 */
1744static void
1745call_bc_transmit(struct rpc_task *task)
1746{
1747        struct rpc_rqst *req = task->tk_rqstp;
1748
1749        task->tk_status = xprt_prepare_transmit(task);
1750        if (task->tk_status == -EAGAIN) {
1751                /*
1752                 * Could not reserve the transport. Try again after the
1753                 * transport is released.
1754                 */
1755                task->tk_status = 0;
1756                task->tk_action = call_bc_transmit;
1757                return;
1758        }
1759
1760        task->tk_action = rpc_exit_task;
1761        if (task->tk_status < 0) {
1762                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1763                        "error: %d\n", task->tk_status);
1764                return;
1765        }
1766
1767        xprt_transmit(task);
1768        xprt_end_transmit(task);
1769        dprint_status(task);
1770        switch (task->tk_status) {
1771        case 0:
1772                /* Success */
1773                break;
1774        case -EHOSTDOWN:
1775        case -EHOSTUNREACH:
1776        case -ENETUNREACH:
1777        case -ETIMEDOUT:
1778                /*
1779                 * Problem reaching the server.  Disconnect and let the
1780                 * forechannel reestablish the connection.  The server will
1781                 * have to retransmit the backchannel request and we'll
1782                 * reprocess it.  Since these ops are idempotent, there's no
1783                 * need to cache our reply at this time.
1784                 */
1785                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1786                        "error: %d\n", task->tk_status);
1787                xprt_conditional_disconnect(task->tk_xprt,
1788                        req->rq_connect_cookie);
1789                break;
1790        default:
1791                /*
1792                 * We were unable to reply and will have to drop the
1793                 * request.  The server should reconnect and retransmit.
1794                 */
1795                WARN_ON_ONCE(task->tk_status == -EAGAIN);
1796                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1797                        "error: %d\n", task->tk_status);
1798                break;
1799        }
1800        rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1801}
1802#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1803
1804/*
1805 * 6.   Sort out the RPC call status
1806 */
1807static void
1808call_status(struct rpc_task *task)
1809{
1810        struct rpc_clnt *clnt = task->tk_client;
1811        struct rpc_rqst *req = task->tk_rqstp;
1812        int             status;
1813
1814        if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1815                task->tk_status = req->rq_reply_bytes_recvd;
1816
1817        dprint_status(task);
1818
1819        status = task->tk_status;
1820        if (status >= 0) {
1821                task->tk_action = call_decode;
1822                return;
1823        }
1824
1825        trace_rpc_call_status(task);
1826        task->tk_status = 0;
1827        switch(status) {
1828        case -EHOSTDOWN:
1829        case -EHOSTUNREACH:
1830        case -ENETUNREACH:
1831                /*
1832                 * Delay any retries for 3 seconds, then handle as if it
1833                 * were a timeout.
1834                 */
1835                rpc_delay(task, 3*HZ);
1836        case -ETIMEDOUT:
1837                task->tk_action = call_timeout;
1838                if (task->tk_client->cl_discrtry)
1839                        xprt_conditional_disconnect(task->tk_xprt,
1840                                        req->rq_connect_cookie);
1841                break;
1842        case -ECONNRESET:
1843        case -ECONNREFUSED:
1844                rpc_force_rebind(clnt);
1845                rpc_delay(task, 3*HZ);
1846        case -EPIPE:
1847        case -ENOTCONN:
1848                task->tk_action = call_bind;
1849                break;
1850        case -EAGAIN:
1851                task->tk_action = call_transmit;
1852                break;
1853        case -EIO:
1854                /* shutdown or soft timeout */
1855                rpc_exit(task, status);
1856                break;
1857        default:
1858                if (clnt->cl_chatty)
1859                        printk("%s: RPC call returned error %d\n",
1860                               clnt->cl_protname, -status);
1861                rpc_exit(task, status);
1862        }
1863}
1864
1865/*
1866 * 6a.  Handle RPC timeout
1867 *      We do not release the request slot, so we keep using the
1868 *      same XID for all retransmits.
1869 */
1870static void
1871call_timeout(struct rpc_task *task)
1872{
1873        struct rpc_clnt *clnt = task->tk_client;
1874
1875        if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1876                dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1877                goto retry;
1878        }
1879
1880        dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1881        task->tk_timeouts++;
1882
1883        if (RPC_IS_SOFTCONN(task)) {
1884                rpc_exit(task, -ETIMEDOUT);
1885                return;
1886        }
1887        if (RPC_IS_SOFT(task)) {
1888                if (clnt->cl_chatty) {
1889                        rcu_read_lock();
1890                        printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1891                                clnt->cl_protname,
1892                                rcu_dereference(clnt->cl_xprt)->servername);
1893                        rcu_read_unlock();
1894                }
1895                if (task->tk_flags & RPC_TASK_TIMEOUT)
1896                        rpc_exit(task, -ETIMEDOUT);
1897                else
1898                        rpc_exit(task, -EIO);
1899                return;
1900        }
1901
1902        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1903                task->tk_flags |= RPC_CALL_MAJORSEEN;
1904                if (clnt->cl_chatty) {
1905                        rcu_read_lock();
1906                        printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1907                        clnt->cl_protname,
1908                        rcu_dereference(clnt->cl_xprt)->servername);
1909                        rcu_read_unlock();
1910                }
1911        }
1912        rpc_force_rebind(clnt);
1913        /*
1914         * Did our request time out due to an RPCSEC_GSS out-of-sequence
1915         * event? RFC2203 requires the server to drop all such requests.
1916         */
1917        rpcauth_invalcred(task);
1918
1919retry:
1920        clnt->cl_stats->rpcretrans++;
1921        task->tk_action = call_bind;
1922        task->tk_status = 0;
1923}
1924
1925/*
1926 * 7.   Decode the RPC reply
1927 */
1928static void
1929call_decode(struct rpc_task *task)
1930{
1931        struct rpc_clnt *clnt = task->tk_client;
1932        struct rpc_rqst *req = task->tk_rqstp;
1933        kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
1934        __be32          *p;
1935
1936        dprint_status(task);
1937
1938        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1939                if (clnt->cl_chatty) {
1940                        rcu_read_lock();
1941                        printk(KERN_NOTICE "%s: server %s OK\n",
1942                                clnt->cl_protname,
1943                                rcu_dereference(clnt->cl_xprt)->servername);
1944                        rcu_read_unlock();
1945                }
1946                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1947        }
1948
1949        /*
1950         * Ensure that we see all writes made by xprt_complete_rqst()
1951         * before it changed req->rq_reply_bytes_recvd.
1952         */
1953        smp_rmb();
1954        req->rq_rcv_buf.len = req->rq_private_buf.len;
1955
1956        /* Check that the softirq receive buffer is valid */
1957        WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1958                                sizeof(req->rq_rcv_buf)) != 0);
1959
1960        if (req->rq_rcv_buf.len < 12) {
1961                if (!RPC_IS_SOFT(task)) {
1962                        task->tk_action = call_bind;
1963                        clnt->cl_stats->rpcretrans++;
1964                        goto out_retry;
1965                }
1966                dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1967                                clnt->cl_protname, task->tk_status);
1968                task->tk_action = call_timeout;
1969                goto out_retry;
1970        }
1971
1972        p = rpc_verify_header(task);
1973        if (IS_ERR(p)) {
1974                if (p == ERR_PTR(-EAGAIN))
1975                        goto out_retry;
1976                return;
1977        }
1978
1979        task->tk_action = rpc_exit_task;
1980
1981        if (decode) {
1982                task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1983                                                      task->tk_msg.rpc_resp);
1984        }
1985        dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1986                        task->tk_status);
1987        return;
1988out_retry:
1989        task->tk_status = 0;
1990        /* Note: rpc_verify_header() may have freed the RPC slot */
1991        if (task->tk_rqstp == req) {
1992                req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
1993                if (task->tk_client->cl_discrtry)
1994                        xprt_conditional_disconnect(task->tk_xprt,
1995                                        req->rq_connect_cookie);
1996        }
1997}
1998
1999static __be32 *
2000rpc_encode_header(struct rpc_task *task)
2001{
2002        struct rpc_clnt *clnt = task->tk_client;
2003        struct rpc_rqst *req = task->tk_rqstp;
2004        __be32          *p = req->rq_svec[0].iov_base;
2005
2006        /* FIXME: check buffer size? */
2007
2008        p = xprt_skip_transport_header(task->tk_xprt, p);
2009        *p++ = req->rq_xid;             /* XID */
2010        *p++ = htonl(RPC_CALL);         /* CALL */
2011        *p++ = htonl(RPC_VERSION);      /* RPC version */
2012        *p++ = htonl(clnt->cl_prog);    /* program number */
2013        *p++ = htonl(clnt->cl_vers);    /* program version */
2014        *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2015        p = rpcauth_marshcred(task, p);
2016        req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2017        return p;
2018}
2019
2020static __be32 *
2021rpc_verify_header(struct rpc_task *task)
2022{
2023        struct rpc_clnt *clnt = task->tk_client;
2024        struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2025        int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2026        __be32  *p = iov->iov_base;
2027        u32 n;
2028        int error = -EACCES;
2029
2030        if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2031                /* RFC-1014 says that the representation of XDR data must be a
2032                 * multiple of four bytes
2033                 * - if it isn't pointer subtraction in the NFS client may give
2034                 *   undefined results
2035                 */
2036                dprintk("RPC: %5u %s: XDR representation not a multiple of"
2037                       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2038                       task->tk_rqstp->rq_rcv_buf.len);
2039                goto out_eio;
2040        }
2041        if ((len -= 3) < 0)
2042                goto out_overflow;
2043
2044        p += 1; /* skip XID */
2045        if ((n = ntohl(*p++)) != RPC_REPLY) {
2046                dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2047                        task->tk_pid, __func__, n);
2048                goto out_garbage;
2049        }
2050
2051        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2052                if (--len < 0)
2053                        goto out_overflow;
2054                switch ((n = ntohl(*p++))) {
2055                case RPC_AUTH_ERROR:
2056                        break;
2057                case RPC_MISMATCH:
2058                        dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2059                                task->tk_pid, __func__);
2060                        error = -EPROTONOSUPPORT;
2061                        goto out_err;
2062                default:
2063                        dprintk("RPC: %5u %s: RPC call rejected, "
2064                                "unknown error: %x\n",
2065                                task->tk_pid, __func__, n);
2066                        goto out_eio;
2067                }
2068                if (--len < 0)
2069                        goto out_overflow;
2070                switch ((n = ntohl(*p++))) {
2071                case RPC_AUTH_REJECTEDCRED:
2072                case RPC_AUTH_REJECTEDVERF:
2073                case RPCSEC_GSS_CREDPROBLEM:
2074                case RPCSEC_GSS_CTXPROBLEM:
2075                        if (!task->tk_cred_retry)
2076                                break;
2077                        task->tk_cred_retry--;
2078                        dprintk("RPC: %5u %s: retry stale creds\n",
2079                                        task->tk_pid, __func__);
2080                        rpcauth_invalcred(task);
2081                        /* Ensure we obtain a new XID! */
2082                        xprt_release(task);
2083                        task->tk_action = call_reserve;
2084                        goto out_retry;
2085                case RPC_AUTH_BADCRED:
2086                case RPC_AUTH_BADVERF:
2087                        /* possibly garbled cred/verf? */
2088                        if (!task->tk_garb_retry)
2089                                break;
2090                        task->tk_garb_retry--;
2091                        dprintk("RPC: %5u %s: retry garbled creds\n",
2092                                        task->tk_pid, __func__);
2093                        task->tk_action = call_bind;
2094                        goto out_retry;
2095                case RPC_AUTH_TOOWEAK:
2096                        rcu_read_lock();
2097                        printk(KERN_NOTICE "RPC: server %s requires stronger "
2098                               "authentication.\n",
2099                               rcu_dereference(clnt->cl_xprt)->servername);
2100                        rcu_read_unlock();
2101                        break;
2102                default:
2103                        dprintk("RPC: %5u %s: unknown auth error: %x\n",
2104                                        task->tk_pid, __func__, n);
2105                        error = -EIO;
2106                }
2107                dprintk("RPC: %5u %s: call rejected %d\n",
2108                                task->tk_pid, __func__, n);
2109                goto out_err;
2110        }
2111        if (!(p = rpcauth_checkverf(task, p))) {
2112                dprintk("RPC: %5u %s: auth check failed\n",
2113                                task->tk_pid, __func__);
2114                goto out_garbage;               /* bad verifier, retry */
2115        }
2116        len = p - (__be32 *)iov->iov_base - 1;
2117        if (len < 0)
2118                goto out_overflow;
2119        switch ((n = ntohl(*p++))) {
2120        case RPC_SUCCESS:
2121                return p;
2122        case RPC_PROG_UNAVAIL:
2123                dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2124                                "by server %s\n", task->tk_pid, __func__,
2125                                (unsigned int)clnt->cl_prog,
2126                                rcu_dereference(clnt->cl_xprt)->servername);
2127                error = -EPFNOSUPPORT;
2128                goto out_err;
2129        case RPC_PROG_MISMATCH:
2130                dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2131                                "by server %s\n", task->tk_pid, __func__,
2132                                (unsigned int)clnt->cl_prog,
2133                                (unsigned int)clnt->cl_vers,
2134                                rcu_dereference(clnt->cl_xprt)->servername);
2135                error = -EPROTONOSUPPORT;
2136                goto out_err;
2137        case RPC_PROC_UNAVAIL:
2138                dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2139                                "version %u on server %s\n",
2140                                task->tk_pid, __func__,
2141                                rpc_proc_name(task),
2142                                clnt->cl_prog, clnt->cl_vers,
2143                                rcu_dereference(clnt->cl_xprt)->servername);
2144                error = -EOPNOTSUPP;
2145                goto out_err;
2146        case RPC_GARBAGE_ARGS:
2147                dprintk("RPC: %5u %s: server saw garbage\n",
2148                                task->tk_pid, __func__);
2149                break;                  /* retry */
2150        default:
2151                dprintk("RPC: %5u %s: server accept status: %x\n",
2152                                task->tk_pid, __func__, n);
2153                /* Also retry */
2154        }
2155
2156out_garbage:
2157        clnt->cl_stats->rpcgarbage++;
2158        if (task->tk_garb_retry) {
2159                task->tk_garb_retry--;
2160                dprintk("RPC: %5u %s: retrying\n",
2161                                task->tk_pid, __func__);
2162                task->tk_action = call_bind;
2163out_retry:
2164                return ERR_PTR(-EAGAIN);
2165        }
2166out_eio:
2167        error = -EIO;
2168out_err:
2169        rpc_exit(task, error);
2170        dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2171                        __func__, error);
2172        return ERR_PTR(error);
2173out_overflow:
2174        dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2175                        __func__);
2176        goto out_garbage;
2177}
2178
2179static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2180{
2181}
2182
2183static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2184{
2185        return 0;
2186}
2187
2188static struct rpc_procinfo rpcproc_null = {
2189        .p_encode = rpcproc_encode_null,
2190        .p_decode = rpcproc_decode_null,
2191};
2192
2193static int rpc_ping(struct rpc_clnt *clnt)
2194{
2195        struct rpc_message msg = {
2196                .rpc_proc = &rpcproc_null,
2197        };
2198        int err;
2199        msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2200        err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2201        put_rpccred(msg.rpc_cred);
2202        return err;
2203}
2204
2205struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2206{
2207        struct rpc_message msg = {
2208                .rpc_proc = &rpcproc_null,
2209                .rpc_cred = cred,
2210        };
2211        struct rpc_task_setup task_setup_data = {
2212                .rpc_client = clnt,
2213                .rpc_message = &msg,
2214                .callback_ops = &rpc_default_ops,
2215                .flags = flags,
2216        };
2217        return rpc_run_task(&task_setup_data);
2218}
2219EXPORT_SYMBOL_GPL(rpc_call_null);
2220
2221#ifdef RPC_DEBUG
2222static void rpc_show_header(void)
2223{
2224        printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2225                "-timeout ---ops--\n");
2226}
2227
2228static void rpc_show_task(const struct rpc_clnt *clnt,
2229                          const struct rpc_task *task)
2230{
2231        const char *rpc_waitq = "none";
2232
2233        if (RPC_IS_QUEUED(task))
2234                rpc_waitq = rpc_qname(task->tk_waitqueue);
2235
2236        printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2237                task->tk_pid, task->tk_flags, task->tk_status,
2238                clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2239                clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
2240                task->tk_action, rpc_waitq);
2241}
2242
2243void rpc_show_tasks(struct net *net)
2244{
2245        struct rpc_clnt *clnt;
2246        struct rpc_task *task;
2247        int header = 0;
2248        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2249
2250        spin_lock(&sn->rpc_client_lock);
2251        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2252                spin_lock(&clnt->cl_lock);
2253                list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2254                        if (!header) {
2255                                rpc_show_header();
2256                                header++;
2257                        }
2258                        rpc_show_task(clnt, task);
2259                }
2260                spin_unlock(&clnt->cl_lock);
2261        }
2262        spin_unlock(&sn->rpc_client_lock);
2263}
2264#endif
2265