linux/net/sunrpc/clnt.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 *  linux/net/sunrpc/clnt.c
   4 *
   5 *  This file contains the high-level RPC interface.
   6 *  It is modeled as a finite state machine to support both synchronous
   7 *  and asynchronous requests.
   8 *
   9 *  -   RPC header generation and argument serialization.
  10 *  -   Credential refresh.
  11 *  -   TCP connect handling.
  12 *  -   Retry of operation when it is suspected the operation failed because
  13 *      of uid squashing on the server, or when the credentials were stale
  14 *      and need to be refreshed, or when a packet was damaged in transit.
  15 *      This may be have to be moved to the VFS layer.
  16 *
  17 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
  18 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
  19 */
  20
  21
  22#include <linux/module.h>
  23#include <linux/types.h>
  24#include <linux/kallsyms.h>
  25#include <linux/mm.h>
  26#include <linux/namei.h>
  27#include <linux/mount.h>
  28#include <linux/slab.h>
  29#include <linux/rcupdate.h>
  30#include <linux/utsname.h>
  31#include <linux/workqueue.h>
  32#include <linux/in.h>
  33#include <linux/in6.h>
  34#include <linux/un.h>
  35
  36#include <linux/sunrpc/clnt.h>
  37#include <linux/sunrpc/addr.h>
  38#include <linux/sunrpc/rpc_pipe_fs.h>
  39#include <linux/sunrpc/metrics.h>
  40#include <linux/sunrpc/bc_xprt.h>
  41#include <trace/events/sunrpc.h>
  42
  43#include "sunrpc.h"
  44#include "sysfs.h"
  45#include "netns.h"
  46
  47#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  48# define RPCDBG_FACILITY        RPCDBG_CALL
  49#endif
  50
  51/*
  52 * All RPC clients are linked into this list
  53 */
  54
  55static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
  56
  57
  58static void     call_start(struct rpc_task *task);
  59static void     call_reserve(struct rpc_task *task);
  60static void     call_reserveresult(struct rpc_task *task);
  61static void     call_allocate(struct rpc_task *task);
  62static void     call_encode(struct rpc_task *task);
  63static void     call_decode(struct rpc_task *task);
  64static void     call_bind(struct rpc_task *task);
  65static void     call_bind_status(struct rpc_task *task);
  66static void     call_transmit(struct rpc_task *task);
  67static void     call_status(struct rpc_task *task);
  68static void     call_transmit_status(struct rpc_task *task);
  69static void     call_refresh(struct rpc_task *task);
  70static void     call_refreshresult(struct rpc_task *task);
  71static void     call_connect(struct rpc_task *task);
  72static void     call_connect_status(struct rpc_task *task);
  73
  74static int      rpc_encode_header(struct rpc_task *task,
  75                                  struct xdr_stream *xdr);
  76static int      rpc_decode_header(struct rpc_task *task,
  77                                  struct xdr_stream *xdr);
  78static int      rpc_ping(struct rpc_clnt *clnt);
  79static void     rpc_check_timeout(struct rpc_task *task);
  80
  81static void rpc_register_client(struct rpc_clnt *clnt)
  82{
  83        struct net *net = rpc_net_ns(clnt);
  84        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  85
  86        spin_lock(&sn->rpc_client_lock);
  87        list_add(&clnt->cl_clients, &sn->all_clients);
  88        spin_unlock(&sn->rpc_client_lock);
  89}
  90
  91static void rpc_unregister_client(struct rpc_clnt *clnt)
  92{
  93        struct net *net = rpc_net_ns(clnt);
  94        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
  95
  96        spin_lock(&sn->rpc_client_lock);
  97        list_del(&clnt->cl_clients);
  98        spin_unlock(&sn->rpc_client_lock);
  99}
 100
 101static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 102{
 103        rpc_remove_client_dir(clnt);
 104}
 105
 106static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
 107{
 108        struct net *net = rpc_net_ns(clnt);
 109        struct super_block *pipefs_sb;
 110
 111        pipefs_sb = rpc_get_sb_net(net);
 112        if (pipefs_sb) {
 113                __rpc_clnt_remove_pipedir(clnt);
 114                rpc_put_sb_net(net);
 115        }
 116}
 117
 118static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
 119                                    struct rpc_clnt *clnt)
 120{
 121        static uint32_t clntid;
 122        const char *dir_name = clnt->cl_program->pipe_dir_name;
 123        char name[15];
 124        struct dentry *dir, *dentry;
 125
 126        dir = rpc_d_lookup_sb(sb, dir_name);
 127        if (dir == NULL) {
 128                pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
 129                return dir;
 130        }
 131        for (;;) {
 132                snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
 133                name[sizeof(name) - 1] = '\0';
 134                dentry = rpc_create_client_dir(dir, name, clnt);
 135                if (!IS_ERR(dentry))
 136                        break;
 137                if (dentry == ERR_PTR(-EEXIST))
 138                        continue;
 139                printk(KERN_INFO "RPC: Couldn't create pipefs entry"
 140                                " %s/%s, error %ld\n",
 141                                dir_name, name, PTR_ERR(dentry));
 142                break;
 143        }
 144        dput(dir);
 145        return dentry;
 146}
 147
 148static int
 149rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
 150{
 151        struct dentry *dentry;
 152
 153        if (clnt->cl_program->pipe_dir_name != NULL) {
 154                dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
 155                if (IS_ERR(dentry))
 156                        return PTR_ERR(dentry);
 157        }
 158        return 0;
 159}
 160
 161static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
 162{
 163        if (clnt->cl_program->pipe_dir_name == NULL)
 164                return 1;
 165
 166        switch (event) {
 167        case RPC_PIPEFS_MOUNT:
 168                if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
 169                        return 1;
 170                if (atomic_read(&clnt->cl_count) == 0)
 171                        return 1;
 172                break;
 173        case RPC_PIPEFS_UMOUNT:
 174                if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
 175                        return 1;
 176                break;
 177        }
 178        return 0;
 179}
 180
 181static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
 182                                   struct super_block *sb)
 183{
 184        struct dentry *dentry;
 185
 186        switch (event) {
 187        case RPC_PIPEFS_MOUNT:
 188                dentry = rpc_setup_pipedir_sb(sb, clnt);
 189                if (!dentry)
 190                        return -ENOENT;
 191                if (IS_ERR(dentry))
 192                        return PTR_ERR(dentry);
 193                break;
 194        case RPC_PIPEFS_UMOUNT:
 195                __rpc_clnt_remove_pipedir(clnt);
 196                break;
 197        default:
 198                printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
 199                return -ENOTSUPP;
 200        }
 201        return 0;
 202}
 203
 204static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
 205                                struct super_block *sb)
 206{
 207        int error = 0;
 208
 209        for (;; clnt = clnt->cl_parent) {
 210                if (!rpc_clnt_skip_event(clnt, event))
 211                        error = __rpc_clnt_handle_event(clnt, event, sb);
 212                if (error || clnt == clnt->cl_parent)
 213                        break;
 214        }
 215        return error;
 216}
 217
 218static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
 219{
 220        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
 221        struct rpc_clnt *clnt;
 222
 223        spin_lock(&sn->rpc_client_lock);
 224        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
 225                if (rpc_clnt_skip_event(clnt, event))
 226                        continue;
 227                spin_unlock(&sn->rpc_client_lock);
 228                return clnt;
 229        }
 230        spin_unlock(&sn->rpc_client_lock);
 231        return NULL;
 232}
 233
 234static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
 235                            void *ptr)
 236{
 237        struct super_block *sb = ptr;
 238        struct rpc_clnt *clnt;
 239        int error = 0;
 240
 241        while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
 242                error = __rpc_pipefs_event(clnt, event, sb);
 243                if (error)
 244                        break;
 245        }
 246        return error;
 247}
 248
 249static struct notifier_block rpc_clients_block = {
 250        .notifier_call  = rpc_pipefs_event,
 251        .priority       = SUNRPC_PIPEFS_RPC_PRIO,
 252};
 253
 254int rpc_clients_notifier_register(void)
 255{
 256        return rpc_pipefs_notifier_register(&rpc_clients_block);
 257}
 258
 259void rpc_clients_notifier_unregister(void)
 260{
 261        return rpc_pipefs_notifier_unregister(&rpc_clients_block);
 262}
 263
 264static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
 265                struct rpc_xprt *xprt,
 266                const struct rpc_timeout *timeout)
 267{
 268        struct rpc_xprt *old;
 269
 270        spin_lock(&clnt->cl_lock);
 271        old = rcu_dereference_protected(clnt->cl_xprt,
 272                        lockdep_is_held(&clnt->cl_lock));
 273
 274        if (!xprt_bound(xprt))
 275                clnt->cl_autobind = 1;
 276
 277        clnt->cl_timeout = timeout;
 278        rcu_assign_pointer(clnt->cl_xprt, xprt);
 279        spin_unlock(&clnt->cl_lock);
 280
 281        return old;
 282}
 283
 284static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
 285{
 286        clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
 287                        nodename, sizeof(clnt->cl_nodename));
 288}
 289
 290static int rpc_client_register(struct rpc_clnt *clnt,
 291                               rpc_authflavor_t pseudoflavor,
 292                               const char *client_name)
 293{
 294        struct rpc_auth_create_args auth_args = {
 295                .pseudoflavor = pseudoflavor,
 296                .target_name = client_name,
 297        };
 298        struct rpc_auth *auth;
 299        struct net *net = rpc_net_ns(clnt);
 300        struct super_block *pipefs_sb;
 301        int err;
 302
 303        rpc_clnt_debugfs_register(clnt);
 304
 305        pipefs_sb = rpc_get_sb_net(net);
 306        if (pipefs_sb) {
 307                err = rpc_setup_pipedir(pipefs_sb, clnt);
 308                if (err)
 309                        goto out;
 310        }
 311
 312        rpc_register_client(clnt);
 313        if (pipefs_sb)
 314                rpc_put_sb_net(net);
 315
 316        auth = rpcauth_create(&auth_args, clnt);
 317        if (IS_ERR(auth)) {
 318                dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
 319                                pseudoflavor);
 320                err = PTR_ERR(auth);
 321                goto err_auth;
 322        }
 323        return 0;
 324err_auth:
 325        pipefs_sb = rpc_get_sb_net(net);
 326        rpc_unregister_client(clnt);
 327        __rpc_clnt_remove_pipedir(clnt);
 328out:
 329        if (pipefs_sb)
 330                rpc_put_sb_net(net);
 331        rpc_sysfs_client_destroy(clnt);
 332        rpc_clnt_debugfs_unregister(clnt);
 333        return err;
 334}
 335
 336static DEFINE_IDA(rpc_clids);
 337
 338void rpc_cleanup_clids(void)
 339{
 340        ida_destroy(&rpc_clids);
 341}
 342
 343static int rpc_alloc_clid(struct rpc_clnt *clnt)
 344{
 345        int clid;
 346
 347        clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
 348        if (clid < 0)
 349                return clid;
 350        clnt->cl_clid = clid;
 351        return 0;
 352}
 353
 354static void rpc_free_clid(struct rpc_clnt *clnt)
 355{
 356        ida_simple_remove(&rpc_clids, clnt->cl_clid);
 357}
 358
 359static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
 360                struct rpc_xprt_switch *xps,
 361                struct rpc_xprt *xprt,
 362                struct rpc_clnt *parent)
 363{
 364        const struct rpc_program *program = args->program;
 365        const struct rpc_version *version;
 366        struct rpc_clnt *clnt = NULL;
 367        const struct rpc_timeout *timeout;
 368        const char *nodename = args->nodename;
 369        int err;
 370
 371        err = rpciod_up();
 372        if (err)
 373                goto out_no_rpciod;
 374
 375        err = -EINVAL;
 376        if (args->version >= program->nrvers)
 377                goto out_err;
 378        version = program->version[args->version];
 379        if (version == NULL)
 380                goto out_err;
 381
 382        err = -ENOMEM;
 383        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
 384        if (!clnt)
 385                goto out_err;
 386        clnt->cl_parent = parent ? : clnt;
 387
 388        err = rpc_alloc_clid(clnt);
 389        if (err)
 390                goto out_no_clid;
 391
 392        clnt->cl_cred     = get_cred(args->cred);
 393        clnt->cl_procinfo = version->procs;
 394        clnt->cl_maxproc  = version->nrprocs;
 395        clnt->cl_prog     = args->prognumber ? : program->number;
 396        clnt->cl_vers     = version->number;
 397        clnt->cl_stats    = program->stats;
 398        clnt->cl_metrics  = rpc_alloc_iostats(clnt);
 399        rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
 400        err = -ENOMEM;
 401        if (clnt->cl_metrics == NULL)
 402                goto out_no_stats;
 403        clnt->cl_program  = program;
 404        INIT_LIST_HEAD(&clnt->cl_tasks);
 405        spin_lock_init(&clnt->cl_lock);
 406
 407        timeout = xprt->timeout;
 408        if (args->timeout != NULL) {
 409                memcpy(&clnt->cl_timeout_default, args->timeout,
 410                                sizeof(clnt->cl_timeout_default));
 411                timeout = &clnt->cl_timeout_default;
 412        }
 413
 414        rpc_clnt_set_transport(clnt, xprt, timeout);
 415        xprt->main = true;
 416        xprt_iter_init(&clnt->cl_xpi, xps);
 417        xprt_switch_put(xps);
 418
 419        clnt->cl_rtt = &clnt->cl_rtt_default;
 420        rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
 421
 422        atomic_set(&clnt->cl_count, 1);
 423
 424        if (nodename == NULL)
 425                nodename = utsname()->nodename;
 426        /* save the nodename */
 427        rpc_clnt_set_nodename(clnt, nodename);
 428
 429        rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
 430        err = rpc_client_register(clnt, args->authflavor, args->client_name);
 431        if (err)
 432                goto out_no_path;
 433        if (parent)
 434                atomic_inc(&parent->cl_count);
 435
 436        trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
 437        return clnt;
 438
 439out_no_path:
 440        rpc_free_iostats(clnt->cl_metrics);
 441out_no_stats:
 442        put_cred(clnt->cl_cred);
 443        rpc_free_clid(clnt);
 444out_no_clid:
 445        kfree(clnt);
 446out_err:
 447        rpciod_down();
 448out_no_rpciod:
 449        xprt_switch_put(xps);
 450        xprt_put(xprt);
 451        trace_rpc_clnt_new_err(program->name, args->servername, err);
 452        return ERR_PTR(err);
 453}
 454
 455static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
 456                                        struct rpc_xprt *xprt)
 457{
 458        struct rpc_clnt *clnt = NULL;
 459        struct rpc_xprt_switch *xps;
 460
 461        if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
 462                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 463                xps = args->bc_xprt->xpt_bc_xps;
 464                xprt_switch_get(xps);
 465        } else {
 466                xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 467                if (xps == NULL) {
 468                        xprt_put(xprt);
 469                        return ERR_PTR(-ENOMEM);
 470                }
 471                if (xprt->bc_xprt) {
 472                        xprt_switch_get(xps);
 473                        xprt->bc_xprt->xpt_bc_xps = xps;
 474                }
 475        }
 476        clnt = rpc_new_client(args, xps, xprt, NULL);
 477        if (IS_ERR(clnt))
 478                return clnt;
 479
 480        if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
 481                int err = rpc_ping(clnt);
 482                if (err != 0) {
 483                        rpc_shutdown_client(clnt);
 484                        return ERR_PTR(err);
 485                }
 486        }
 487
 488        clnt->cl_softrtry = 1;
 489        if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
 490                clnt->cl_softrtry = 0;
 491                if (args->flags & RPC_CLNT_CREATE_SOFTERR)
 492                        clnt->cl_softerr = 1;
 493        }
 494
 495        if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
 496                clnt->cl_autobind = 1;
 497        if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
 498                clnt->cl_noretranstimeo = 1;
 499        if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
 500                clnt->cl_discrtry = 1;
 501        if (!(args->flags & RPC_CLNT_CREATE_QUIET))
 502                clnt->cl_chatty = 1;
 503
 504        return clnt;
 505}
 506
 507/**
 508 * rpc_create - create an RPC client and transport with one call
 509 * @args: rpc_clnt create argument structure
 510 *
 511 * Creates and initializes an RPC transport and an RPC client.
 512 *
 513 * It can ping the server in order to determine if it is up, and to see if
 514 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
 515 * this behavior so asynchronous tasks can also use rpc_create.
 516 */
 517struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 518{
 519        struct rpc_xprt *xprt;
 520        struct xprt_create xprtargs = {
 521                .net = args->net,
 522                .ident = args->protocol,
 523                .srcaddr = args->saddress,
 524                .dstaddr = args->address,
 525                .addrlen = args->addrsize,
 526                .servername = args->servername,
 527                .bc_xprt = args->bc_xprt,
 528        };
 529        char servername[48];
 530        struct rpc_clnt *clnt;
 531        int i;
 532
 533        if (args->bc_xprt) {
 534                WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
 535                xprt = args->bc_xprt->xpt_bc_xprt;
 536                if (xprt) {
 537                        xprt_get(xprt);
 538                        return rpc_create_xprt(args, xprt);
 539                }
 540        }
 541
 542        if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
 543                xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
 544        if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
 545                xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
 546        /*
 547         * If the caller chooses not to specify a hostname, whip
 548         * up a string representation of the passed-in address.
 549         */
 550        if (xprtargs.servername == NULL) {
 551                struct sockaddr_un *sun =
 552                                (struct sockaddr_un *)args->address;
 553                struct sockaddr_in *sin =
 554                                (struct sockaddr_in *)args->address;
 555                struct sockaddr_in6 *sin6 =
 556                                (struct sockaddr_in6 *)args->address;
 557
 558                servername[0] = '\0';
 559                switch (args->address->sa_family) {
 560                case AF_LOCAL:
 561                        snprintf(servername, sizeof(servername), "%s",
 562                                 sun->sun_path);
 563                        break;
 564                case AF_INET:
 565                        snprintf(servername, sizeof(servername), "%pI4",
 566                                 &sin->sin_addr.s_addr);
 567                        break;
 568                case AF_INET6:
 569                        snprintf(servername, sizeof(servername), "%pI6",
 570                                 &sin6->sin6_addr);
 571                        break;
 572                default:
 573                        /* caller wants default server name, but
 574                         * address family isn't recognized. */
 575                        return ERR_PTR(-EINVAL);
 576                }
 577                xprtargs.servername = servername;
 578        }
 579
 580        xprt = xprt_create_transport(&xprtargs);
 581        if (IS_ERR(xprt))
 582                return (struct rpc_clnt *)xprt;
 583
 584        /*
 585         * By default, kernel RPC client connects from a reserved port.
 586         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
 587         * but it is always enabled for rpciod, which handles the connect
 588         * operation.
 589         */
 590        xprt->resvport = 1;
 591        if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 592                xprt->resvport = 0;
 593        xprt->reuseport = 0;
 594        if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
 595                xprt->reuseport = 1;
 596
 597        clnt = rpc_create_xprt(args, xprt);
 598        if (IS_ERR(clnt) || args->nconnect <= 1)
 599                return clnt;
 600
 601        for (i = 0; i < args->nconnect - 1; i++) {
 602                if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
 603                        break;
 604        }
 605        return clnt;
 606}
 607EXPORT_SYMBOL_GPL(rpc_create);
 608
 609/*
 610 * This function clones the RPC client structure. It allows us to share the
 611 * same transport while varying parameters such as the authentication
 612 * flavour.
 613 */
 614static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
 615                                           struct rpc_clnt *clnt)
 616{
 617        struct rpc_xprt_switch *xps;
 618        struct rpc_xprt *xprt;
 619        struct rpc_clnt *new;
 620        int err;
 621
 622        err = -ENOMEM;
 623        rcu_read_lock();
 624        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
 625        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 626        rcu_read_unlock();
 627        if (xprt == NULL || xps == NULL) {
 628                xprt_put(xprt);
 629                xprt_switch_put(xps);
 630                goto out_err;
 631        }
 632        args->servername = xprt->servername;
 633        args->nodename = clnt->cl_nodename;
 634
 635        new = rpc_new_client(args, xps, xprt, clnt);
 636        if (IS_ERR(new))
 637                return new;
 638
 639        /* Turn off autobind on clones */
 640        new->cl_autobind = 0;
 641        new->cl_softrtry = clnt->cl_softrtry;
 642        new->cl_softerr = clnt->cl_softerr;
 643        new->cl_noretranstimeo = clnt->cl_noretranstimeo;
 644        new->cl_discrtry = clnt->cl_discrtry;
 645        new->cl_chatty = clnt->cl_chatty;
 646        new->cl_principal = clnt->cl_principal;
 647        return new;
 648
 649out_err:
 650        trace_rpc_clnt_clone_err(clnt, err);
 651        return ERR_PTR(err);
 652}
 653
 654/**
 655 * rpc_clone_client - Clone an RPC client structure
 656 *
 657 * @clnt: RPC client whose parameters are copied
 658 *
 659 * Returns a fresh RPC client or an ERR_PTR.
 660 */
 661struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
 662{
 663        struct rpc_create_args args = {
 664                .program        = clnt->cl_program,
 665                .prognumber     = clnt->cl_prog,
 666                .version        = clnt->cl_vers,
 667                .authflavor     = clnt->cl_auth->au_flavor,
 668                .cred           = clnt->cl_cred,
 669        };
 670        return __rpc_clone_client(&args, clnt);
 671}
 672EXPORT_SYMBOL_GPL(rpc_clone_client);
 673
 674/**
 675 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
 676 *
 677 * @clnt: RPC client whose parameters are copied
 678 * @flavor: security flavor for new client
 679 *
 680 * Returns a fresh RPC client or an ERR_PTR.
 681 */
 682struct rpc_clnt *
 683rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
 684{
 685        struct rpc_create_args args = {
 686                .program        = clnt->cl_program,
 687                .prognumber     = clnt->cl_prog,
 688                .version        = clnt->cl_vers,
 689                .authflavor     = flavor,
 690                .cred           = clnt->cl_cred,
 691        };
 692        return __rpc_clone_client(&args, clnt);
 693}
 694EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
 695
 696/**
 697 * rpc_switch_client_transport: switch the RPC transport on the fly
 698 * @clnt: pointer to a struct rpc_clnt
 699 * @args: pointer to the new transport arguments
 700 * @timeout: pointer to the new timeout parameters
 701 *
 702 * This function allows the caller to switch the RPC transport for the
 703 * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
 704 * server, for instance.  It assumes that the caller has ensured that
 705 * there are no active RPC tasks by using some form of locking.
 706 *
 707 * Returns zero if "clnt" is now using the new xprt.  Otherwise a
 708 * negative errno is returned, and "clnt" continues to use the old
 709 * xprt.
 710 */
 711int rpc_switch_client_transport(struct rpc_clnt *clnt,
 712                struct xprt_create *args,
 713                const struct rpc_timeout *timeout)
 714{
 715        const struct rpc_timeout *old_timeo;
 716        rpc_authflavor_t pseudoflavor;
 717        struct rpc_xprt_switch *xps, *oldxps;
 718        struct rpc_xprt *xprt, *old;
 719        struct rpc_clnt *parent;
 720        int err;
 721
 722        xprt = xprt_create_transport(args);
 723        if (IS_ERR(xprt))
 724                return PTR_ERR(xprt);
 725
 726        xps = xprt_switch_alloc(xprt, GFP_KERNEL);
 727        if (xps == NULL) {
 728                xprt_put(xprt);
 729                return -ENOMEM;
 730        }
 731
 732        pseudoflavor = clnt->cl_auth->au_flavor;
 733
 734        old_timeo = clnt->cl_timeout;
 735        old = rpc_clnt_set_transport(clnt, xprt, timeout);
 736        oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
 737
 738        rpc_unregister_client(clnt);
 739        __rpc_clnt_remove_pipedir(clnt);
 740        rpc_sysfs_client_destroy(clnt);
 741        rpc_clnt_debugfs_unregister(clnt);
 742
 743        /*
 744         * A new transport was created.  "clnt" therefore
 745         * becomes the root of a new cl_parent tree.  clnt's
 746         * children, if it has any, still point to the old xprt.
 747         */
 748        parent = clnt->cl_parent;
 749        clnt->cl_parent = clnt;
 750
 751        /*
 752         * The old rpc_auth cache cannot be re-used.  GSS
 753         * contexts in particular are between a single
 754         * client and server.
 755         */
 756        err = rpc_client_register(clnt, pseudoflavor, NULL);
 757        if (err)
 758                goto out_revert;
 759
 760        synchronize_rcu();
 761        if (parent != clnt)
 762                rpc_release_client(parent);
 763        xprt_switch_put(oldxps);
 764        xprt_put(old);
 765        trace_rpc_clnt_replace_xprt(clnt);
 766        return 0;
 767
 768out_revert:
 769        xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
 770        rpc_clnt_set_transport(clnt, old, old_timeo);
 771        clnt->cl_parent = parent;
 772        rpc_client_register(clnt, pseudoflavor, NULL);
 773        xprt_switch_put(xps);
 774        xprt_put(xprt);
 775        trace_rpc_clnt_replace_xprt_err(clnt);
 776        return err;
 777}
 778EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
 779
 780static
 781int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
 782{
 783        struct rpc_xprt_switch *xps;
 784
 785        rcu_read_lock();
 786        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
 787        rcu_read_unlock();
 788        if (xps == NULL)
 789                return -EAGAIN;
 790        xprt_iter_init_listall(xpi, xps);
 791        xprt_switch_put(xps);
 792        return 0;
 793}
 794
 795/**
 796 * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
 797 * @clnt: pointer to client
 798 * @fn: function to apply
 799 * @data: void pointer to function data
 800 *
 801 * Iterates through the list of RPC transports currently attached to the
 802 * client and applies the function fn(clnt, xprt, data).
 803 *
 804 * On error, the iteration stops, and the function returns the error value.
 805 */
 806int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
 807                int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
 808                void *data)
 809{
 810        struct rpc_xprt_iter xpi;
 811        int ret;
 812
 813        ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
 814        if (ret)
 815                return ret;
 816        for (;;) {
 817                struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
 818
 819                if (!xprt)
 820                        break;
 821                ret = fn(clnt, xprt, data);
 822                xprt_put(xprt);
 823                if (ret < 0)
 824                        break;
 825        }
 826        xprt_iter_destroy(&xpi);
 827        return ret;
 828}
 829EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
 830
 831/*
 832 * Kill all tasks for the given client.
 833 * XXX: kill their descendants as well?
 834 */
 835void rpc_killall_tasks(struct rpc_clnt *clnt)
 836{
 837        struct rpc_task *rovr;
 838
 839
 840        if (list_empty(&clnt->cl_tasks))
 841                return;
 842
 843        /*
 844         * Spin lock all_tasks to prevent changes...
 845         */
 846        trace_rpc_clnt_killall(clnt);
 847        spin_lock(&clnt->cl_lock);
 848        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
 849                rpc_signal_task(rovr);
 850        spin_unlock(&clnt->cl_lock);
 851}
 852EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 853
 854/*
 855 * Properly shut down an RPC client, terminating all outstanding
 856 * requests.
 857 */
 858void rpc_shutdown_client(struct rpc_clnt *clnt)
 859{
 860        might_sleep();
 861
 862        trace_rpc_clnt_shutdown(clnt);
 863
 864        while (!list_empty(&clnt->cl_tasks)) {
 865                rpc_killall_tasks(clnt);
 866                wait_event_timeout(destroy_wait,
 867                        list_empty(&clnt->cl_tasks), 1*HZ);
 868        }
 869
 870        rpc_release_client(clnt);
 871}
 872EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 873
 874/*
 875 * Free an RPC client
 876 */
 877static void rpc_free_client_work(struct work_struct *work)
 878{
 879        struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
 880
 881        trace_rpc_clnt_free(clnt);
 882
 883        /* These might block on processes that might allocate memory,
 884         * so they cannot be called in rpciod, so they are handled separately
 885         * here.
 886         */
 887        rpc_sysfs_client_destroy(clnt);
 888        rpc_clnt_debugfs_unregister(clnt);
 889        rpc_free_clid(clnt);
 890        rpc_clnt_remove_pipedir(clnt);
 891        xprt_put(rcu_dereference_raw(clnt->cl_xprt));
 892
 893        kfree(clnt);
 894        rpciod_down();
 895}
 896static struct rpc_clnt *
 897rpc_free_client(struct rpc_clnt *clnt)
 898{
 899        struct rpc_clnt *parent = NULL;
 900
 901        trace_rpc_clnt_release(clnt);
 902        if (clnt->cl_parent != clnt)
 903                parent = clnt->cl_parent;
 904        rpc_unregister_client(clnt);
 905        rpc_free_iostats(clnt->cl_metrics);
 906        clnt->cl_metrics = NULL;
 907        xprt_iter_destroy(&clnt->cl_xpi);
 908        put_cred(clnt->cl_cred);
 909
 910        INIT_WORK(&clnt->cl_work, rpc_free_client_work);
 911        schedule_work(&clnt->cl_work);
 912        return parent;
 913}
 914
 915/*
 916 * Free an RPC client
 917 */
 918static struct rpc_clnt *
 919rpc_free_auth(struct rpc_clnt *clnt)
 920{
 921        if (clnt->cl_auth == NULL)
 922                return rpc_free_client(clnt);
 923
 924        /*
 925         * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
 926         *       release remaining GSS contexts. This mechanism ensures
 927         *       that it can do so safely.
 928         */
 929        atomic_inc(&clnt->cl_count);
 930        rpcauth_release(clnt->cl_auth);
 931        clnt->cl_auth = NULL;
 932        if (atomic_dec_and_test(&clnt->cl_count))
 933                return rpc_free_client(clnt);
 934        return NULL;
 935}
 936
 937/*
 938 * Release reference to the RPC client
 939 */
 940void
 941rpc_release_client(struct rpc_clnt *clnt)
 942{
 943        do {
 944                if (list_empty(&clnt->cl_tasks))
 945                        wake_up(&destroy_wait);
 946                if (!atomic_dec_and_test(&clnt->cl_count))
 947                        break;
 948                clnt = rpc_free_auth(clnt);
 949        } while (clnt != NULL);
 950}
 951EXPORT_SYMBOL_GPL(rpc_release_client);
 952
 953/**
 954 * rpc_bind_new_program - bind a new RPC program to an existing client
 955 * @old: old rpc_client
 956 * @program: rpc program to set
 957 * @vers: rpc program version
 958 *
 959 * Clones the rpc client and sets up a new RPC program. This is mainly
 960 * of use for enabling different RPC programs to share the same transport.
 961 * The Sun NFSv2/v3 ACL protocol can do this.
 962 */
 963struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 964                                      const struct rpc_program *program,
 965                                      u32 vers)
 966{
 967        struct rpc_create_args args = {
 968                .program        = program,
 969                .prognumber     = program->number,
 970                .version        = vers,
 971                .authflavor     = old->cl_auth->au_flavor,
 972                .cred           = old->cl_cred,
 973        };
 974        struct rpc_clnt *clnt;
 975        int err;
 976
 977        clnt = __rpc_clone_client(&args, old);
 978        if (IS_ERR(clnt))
 979                goto out;
 980        err = rpc_ping(clnt);
 981        if (err != 0) {
 982                rpc_shutdown_client(clnt);
 983                clnt = ERR_PTR(err);
 984        }
 985out:
 986        return clnt;
 987}
 988EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 989
 990struct rpc_xprt *
 991rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
 992{
 993        struct rpc_xprt_switch *xps;
 994
 995        if (!xprt)
 996                return NULL;
 997        rcu_read_lock();
 998        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
 999        atomic_long_inc(&xps->xps_queuelen);
1000        rcu_read_unlock();
1001        atomic_long_inc(&xprt->queuelen);
1002
1003        return xprt;
1004}
1005
1006static void
1007rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1008{
1009        struct rpc_xprt_switch *xps;
1010
1011        atomic_long_dec(&xprt->queuelen);
1012        rcu_read_lock();
1013        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1014        atomic_long_dec(&xps->xps_queuelen);
1015        rcu_read_unlock();
1016
1017        xprt_put(xprt);
1018}
1019
1020void rpc_task_release_transport(struct rpc_task *task)
1021{
1022        struct rpc_xprt *xprt = task->tk_xprt;
1023
1024        if (xprt) {
1025                task->tk_xprt = NULL;
1026                if (task->tk_client)
1027                        rpc_task_release_xprt(task->tk_client, xprt);
1028                else
1029                        xprt_put(xprt);
1030        }
1031}
1032EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1033
1034void rpc_task_release_client(struct rpc_task *task)
1035{
1036        struct rpc_clnt *clnt = task->tk_client;
1037
1038        rpc_task_release_transport(task);
1039        if (clnt != NULL) {
1040                /* Remove from client task list */
1041                spin_lock(&clnt->cl_lock);
1042                list_del(&task->tk_task);
1043                spin_unlock(&clnt->cl_lock);
1044                task->tk_client = NULL;
1045
1046                rpc_release_client(clnt);
1047        }
1048}
1049
1050static struct rpc_xprt *
1051rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1052{
1053        struct rpc_xprt *xprt;
1054
1055        rcu_read_lock();
1056        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1057        rcu_read_unlock();
1058        return rpc_task_get_xprt(clnt, xprt);
1059}
1060
1061static struct rpc_xprt *
1062rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1063{
1064        return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1065}
1066
1067static
1068void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1069{
1070        if (task->tk_xprt)
1071                return;
1072        if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1073                task->tk_xprt = rpc_task_get_first_xprt(clnt);
1074        else
1075                task->tk_xprt = rpc_task_get_next_xprt(clnt);
1076}
1077
1078static
1079void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1080{
1081
1082        if (clnt != NULL) {
1083                rpc_task_set_transport(task, clnt);
1084                task->tk_client = clnt;
1085                atomic_inc(&clnt->cl_count);
1086                if (clnt->cl_softrtry)
1087                        task->tk_flags |= RPC_TASK_SOFT;
1088                if (clnt->cl_softerr)
1089                        task->tk_flags |= RPC_TASK_TIMEOUT;
1090                if (clnt->cl_noretranstimeo)
1091                        task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1092                if (atomic_read(&clnt->cl_swapper))
1093                        task->tk_flags |= RPC_TASK_SWAPPER;
1094                /* Add to the client's list of all tasks */
1095                spin_lock(&clnt->cl_lock);
1096                list_add_tail(&task->tk_task, &clnt->cl_tasks);
1097                spin_unlock(&clnt->cl_lock);
1098        }
1099}
1100
1101static void
1102rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1103{
1104        if (msg != NULL) {
1105                task->tk_msg.rpc_proc = msg->rpc_proc;
1106                task->tk_msg.rpc_argp = msg->rpc_argp;
1107                task->tk_msg.rpc_resp = msg->rpc_resp;
1108                task->tk_msg.rpc_cred = msg->rpc_cred;
1109                if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1110                        get_cred(task->tk_msg.rpc_cred);
1111        }
1112}
1113
1114/*
1115 * Default callback for async RPC calls
1116 */
1117static void
1118rpc_default_callback(struct rpc_task *task, void *data)
1119{
1120}
1121
1122static const struct rpc_call_ops rpc_default_ops = {
1123        .rpc_call_done = rpc_default_callback,
1124};
1125
1126/**
1127 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1128 * @task_setup_data: pointer to task initialisation data
1129 */
1130struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1131{
1132        struct rpc_task *task;
1133
1134        task = rpc_new_task(task_setup_data);
1135
1136        if (!RPC_IS_ASYNC(task))
1137                task->tk_flags |= RPC_TASK_CRED_NOREF;
1138
1139        rpc_task_set_client(task, task_setup_data->rpc_client);
1140        rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1141
1142        if (task->tk_action == NULL)
1143                rpc_call_start(task);
1144
1145        atomic_inc(&task->tk_count);
1146        rpc_execute(task);
1147        return task;
1148}
1149EXPORT_SYMBOL_GPL(rpc_run_task);
1150
1151/**
1152 * rpc_call_sync - Perform a synchronous RPC call
1153 * @clnt: pointer to RPC client
1154 * @msg: RPC call parameters
1155 * @flags: RPC call flags
1156 */
1157int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1158{
1159        struct rpc_task *task;
1160        struct rpc_task_setup task_setup_data = {
1161                .rpc_client = clnt,
1162                .rpc_message = msg,
1163                .callback_ops = &rpc_default_ops,
1164                .flags = flags,
1165        };
1166        int status;
1167
1168        WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1169        if (flags & RPC_TASK_ASYNC) {
1170                rpc_release_calldata(task_setup_data.callback_ops,
1171                        task_setup_data.callback_data);
1172                return -EINVAL;
1173        }
1174
1175        task = rpc_run_task(&task_setup_data);
1176        if (IS_ERR(task))
1177                return PTR_ERR(task);
1178        status = task->tk_status;
1179        rpc_put_task(task);
1180        return status;
1181}
1182EXPORT_SYMBOL_GPL(rpc_call_sync);
1183
1184/**
1185 * rpc_call_async - Perform an asynchronous RPC call
1186 * @clnt: pointer to RPC client
1187 * @msg: RPC call parameters
1188 * @flags: RPC call flags
1189 * @tk_ops: RPC call ops
1190 * @data: user call data
1191 */
1192int
1193rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1194               const struct rpc_call_ops *tk_ops, void *data)
1195{
1196        struct rpc_task *task;
1197        struct rpc_task_setup task_setup_data = {
1198                .rpc_client = clnt,
1199                .rpc_message = msg,
1200                .callback_ops = tk_ops,
1201                .callback_data = data,
1202                .flags = flags|RPC_TASK_ASYNC,
1203        };
1204
1205        task = rpc_run_task(&task_setup_data);
1206        if (IS_ERR(task))
1207                return PTR_ERR(task);
1208        rpc_put_task(task);
1209        return 0;
1210}
1211EXPORT_SYMBOL_GPL(rpc_call_async);
1212
1213#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1214static void call_bc_encode(struct rpc_task *task);
1215
1216/**
1217 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1218 * rpc_execute against it
1219 * @req: RPC request
1220 */
1221struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1222{
1223        struct rpc_task *task;
1224        struct rpc_task_setup task_setup_data = {
1225                .callback_ops = &rpc_default_ops,
1226                .flags = RPC_TASK_SOFTCONN |
1227                        RPC_TASK_NO_RETRANS_TIMEOUT,
1228        };
1229
1230        dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1231        /*
1232         * Create an rpc_task to send the data
1233         */
1234        task = rpc_new_task(&task_setup_data);
1235        xprt_init_bc_request(req, task);
1236
1237        task->tk_action = call_bc_encode;
1238        atomic_inc(&task->tk_count);
1239        WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1240        rpc_execute(task);
1241
1242        dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1243        return task;
1244}
1245#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1246
1247/**
1248 * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1249 * @req: RPC request to prepare
1250 * @pages: vector of struct page pointers
1251 * @base: offset in first page where receive should start, in bytes
1252 * @len: expected size of the upper layer data payload, in bytes
1253 * @hdrsize: expected size of upper layer reply header, in XDR words
1254 *
1255 */
1256void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1257                             unsigned int base, unsigned int len,
1258                             unsigned int hdrsize)
1259{
1260        hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1261
1262        xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1263        trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1264}
1265EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1266
1267void
1268rpc_call_start(struct rpc_task *task)
1269{
1270        task->tk_action = call_start;
1271}
1272EXPORT_SYMBOL_GPL(rpc_call_start);
1273
1274/**
1275 * rpc_peeraddr - extract remote peer address from clnt's xprt
1276 * @clnt: RPC client structure
1277 * @buf: target buffer
1278 * @bufsize: length of target buffer
1279 *
1280 * Returns the number of bytes that are actually in the stored address.
1281 */
1282size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1283{
1284        size_t bytes;
1285        struct rpc_xprt *xprt;
1286
1287        rcu_read_lock();
1288        xprt = rcu_dereference(clnt->cl_xprt);
1289
1290        bytes = xprt->addrlen;
1291        if (bytes > bufsize)
1292                bytes = bufsize;
1293        memcpy(buf, &xprt->addr, bytes);
1294        rcu_read_unlock();
1295
1296        return bytes;
1297}
1298EXPORT_SYMBOL_GPL(rpc_peeraddr);
1299
1300/**
1301 * rpc_peeraddr2str - return remote peer address in printable format
1302 * @clnt: RPC client structure
1303 * @format: address format
1304 *
1305 * NB: the lifetime of the memory referenced by the returned pointer is
1306 * the same as the rpc_xprt itself.  As long as the caller uses this
1307 * pointer, it must hold the RCU read lock.
1308 */
1309const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1310                             enum rpc_display_format_t format)
1311{
1312        struct rpc_xprt *xprt;
1313
1314        xprt = rcu_dereference(clnt->cl_xprt);
1315
1316        if (xprt->address_strings[format] != NULL)
1317                return xprt->address_strings[format];
1318        else
1319                return "unprintable";
1320}
1321EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1322
1323static const struct sockaddr_in rpc_inaddr_loopback = {
1324        .sin_family             = AF_INET,
1325        .sin_addr.s_addr        = htonl(INADDR_ANY),
1326};
1327
1328static const struct sockaddr_in6 rpc_in6addr_loopback = {
1329        .sin6_family            = AF_INET6,
1330        .sin6_addr              = IN6ADDR_ANY_INIT,
1331};
1332
1333/*
1334 * Try a getsockname() on a connected datagram socket.  Using a
1335 * connected datagram socket prevents leaving a socket in TIME_WAIT.
1336 * This conserves the ephemeral port number space.
1337 *
1338 * Returns zero and fills in "buf" if successful; otherwise, a
1339 * negative errno is returned.
1340 */
1341static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1342                        struct sockaddr *buf)
1343{
1344        struct socket *sock;
1345        int err;
1346
1347        err = __sock_create(net, sap->sa_family,
1348                                SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1349        if (err < 0) {
1350                dprintk("RPC:       can't create UDP socket (%d)\n", err);
1351                goto out;
1352        }
1353
1354        switch (sap->sa_family) {
1355        case AF_INET:
1356                err = kernel_bind(sock,
1357                                (struct sockaddr *)&rpc_inaddr_loopback,
1358                                sizeof(rpc_inaddr_loopback));
1359                break;
1360        case AF_INET6:
1361                err = kernel_bind(sock,
1362                                (struct sockaddr *)&rpc_in6addr_loopback,
1363                                sizeof(rpc_in6addr_loopback));
1364                break;
1365        default:
1366                err = -EAFNOSUPPORT;
1367                goto out;
1368        }
1369        if (err < 0) {
1370                dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1371                goto out_release;
1372        }
1373
1374        err = kernel_connect(sock, sap, salen, 0);
1375        if (err < 0) {
1376                dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1377                goto out_release;
1378        }
1379
1380        err = kernel_getsockname(sock, buf);
1381        if (err < 0) {
1382                dprintk("RPC:       getsockname failed (%d)\n", err);
1383                goto out_release;
1384        }
1385
1386        err = 0;
1387        if (buf->sa_family == AF_INET6) {
1388                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1389                sin6->sin6_scope_id = 0;
1390        }
1391        dprintk("RPC:       %s succeeded\n", __func__);
1392
1393out_release:
1394        sock_release(sock);
1395out:
1396        return err;
1397}
1398
1399/*
1400 * Scraping a connected socket failed, so we don't have a useable
1401 * local address.  Fallback: generate an address that will prevent
1402 * the server from calling us back.
1403 *
1404 * Returns zero and fills in "buf" if successful; otherwise, a
1405 * negative errno is returned.
1406 */
1407static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1408{
1409        switch (family) {
1410        case AF_INET:
1411                if (buflen < sizeof(rpc_inaddr_loopback))
1412                        return -EINVAL;
1413                memcpy(buf, &rpc_inaddr_loopback,
1414                                sizeof(rpc_inaddr_loopback));
1415                break;
1416        case AF_INET6:
1417                if (buflen < sizeof(rpc_in6addr_loopback))
1418                        return -EINVAL;
1419                memcpy(buf, &rpc_in6addr_loopback,
1420                                sizeof(rpc_in6addr_loopback));
1421                break;
1422        default:
1423                dprintk("RPC:       %s: address family not supported\n",
1424                        __func__);
1425                return -EAFNOSUPPORT;
1426        }
1427        dprintk("RPC:       %s: succeeded\n", __func__);
1428        return 0;
1429}
1430
1431/**
1432 * rpc_localaddr - discover local endpoint address for an RPC client
1433 * @clnt: RPC client structure
1434 * @buf: target buffer
1435 * @buflen: size of target buffer, in bytes
1436 *
1437 * Returns zero and fills in "buf" and "buflen" if successful;
1438 * otherwise, a negative errno is returned.
1439 *
1440 * This works even if the underlying transport is not currently connected,
1441 * or if the upper layer never previously provided a source address.
1442 *
1443 * The result of this function call is transient: multiple calls in
1444 * succession may give different results, depending on how local
1445 * networking configuration changes over time.
1446 */
1447int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1448{
1449        struct sockaddr_storage address;
1450        struct sockaddr *sap = (struct sockaddr *)&address;
1451        struct rpc_xprt *xprt;
1452        struct net *net;
1453        size_t salen;
1454        int err;
1455
1456        rcu_read_lock();
1457        xprt = rcu_dereference(clnt->cl_xprt);
1458        salen = xprt->addrlen;
1459        memcpy(sap, &xprt->addr, salen);
1460        net = get_net(xprt->xprt_net);
1461        rcu_read_unlock();
1462
1463        rpc_set_port(sap, 0);
1464        err = rpc_sockname(net, sap, salen, buf);
1465        put_net(net);
1466        if (err != 0)
1467                /* Couldn't discover local address, return ANYADDR */
1468                return rpc_anyaddr(sap->sa_family, buf, buflen);
1469        return 0;
1470}
1471EXPORT_SYMBOL_GPL(rpc_localaddr);
1472
1473void
1474rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1475{
1476        struct rpc_xprt *xprt;
1477
1478        rcu_read_lock();
1479        xprt = rcu_dereference(clnt->cl_xprt);
1480        if (xprt->ops->set_buffer_size)
1481                xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1482        rcu_read_unlock();
1483}
1484EXPORT_SYMBOL_GPL(rpc_setbufsize);
1485
1486/**
1487 * rpc_net_ns - Get the network namespace for this RPC client
1488 * @clnt: RPC client to query
1489 *
1490 */
1491struct net *rpc_net_ns(struct rpc_clnt *clnt)
1492{
1493        struct net *ret;
1494
1495        rcu_read_lock();
1496        ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1497        rcu_read_unlock();
1498        return ret;
1499}
1500EXPORT_SYMBOL_GPL(rpc_net_ns);
1501
1502/**
1503 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1504 * @clnt: RPC client to query
1505 *
1506 * For stream transports, this is one RPC record fragment (see RFC
1507 * 1831), as we don't support multi-record requests yet.  For datagram
1508 * transports, this is the size of an IP packet minus the IP, UDP, and
1509 * RPC header sizes.
1510 */
1511size_t rpc_max_payload(struct rpc_clnt *clnt)
1512{
1513        size_t ret;
1514
1515        rcu_read_lock();
1516        ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1517        rcu_read_unlock();
1518        return ret;
1519}
1520EXPORT_SYMBOL_GPL(rpc_max_payload);
1521
1522/**
1523 * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1524 * @clnt: RPC client to query
1525 */
1526size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1527{
1528        struct rpc_xprt *xprt;
1529        size_t ret;
1530
1531        rcu_read_lock();
1532        xprt = rcu_dereference(clnt->cl_xprt);
1533        ret = xprt->ops->bc_maxpayload(xprt);
1534        rcu_read_unlock();
1535        return ret;
1536}
1537EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1538
1539unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1540{
1541        struct rpc_xprt *xprt;
1542        unsigned int ret;
1543
1544        rcu_read_lock();
1545        xprt = rcu_dereference(clnt->cl_xprt);
1546        ret = xprt->ops->bc_num_slots(xprt);
1547        rcu_read_unlock();
1548        return ret;
1549}
1550EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1551
1552/**
1553 * rpc_force_rebind - force transport to check that remote port is unchanged
1554 * @clnt: client to rebind
1555 *
1556 */
1557void rpc_force_rebind(struct rpc_clnt *clnt)
1558{
1559        if (clnt->cl_autobind) {
1560                rcu_read_lock();
1561                xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1562                rcu_read_unlock();
1563        }
1564}
1565EXPORT_SYMBOL_GPL(rpc_force_rebind);
1566
1567static int
1568__rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1569{
1570        task->tk_status = 0;
1571        task->tk_rpc_status = 0;
1572        task->tk_action = action;
1573        return 1;
1574}
1575
1576/*
1577 * Restart an (async) RPC call. Usually called from within the
1578 * exit handler.
1579 */
1580int
1581rpc_restart_call(struct rpc_task *task)
1582{
1583        return __rpc_restart_call(task, call_start);
1584}
1585EXPORT_SYMBOL_GPL(rpc_restart_call);
1586
1587/*
1588 * Restart an (async) RPC call from the call_prepare state.
1589 * Usually called from within the exit handler.
1590 */
1591int
1592rpc_restart_call_prepare(struct rpc_task *task)
1593{
1594        if (task->tk_ops->rpc_call_prepare != NULL)
1595                return __rpc_restart_call(task, rpc_prepare_task);
1596        return rpc_restart_call(task);
1597}
1598EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1599
1600const char
1601*rpc_proc_name(const struct rpc_task *task)
1602{
1603        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1604
1605        if (proc) {
1606                if (proc->p_name)
1607                        return proc->p_name;
1608                else
1609                        return "NULL";
1610        } else
1611                return "no proc";
1612}
1613
1614static void
1615__rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1616{
1617        trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1618        task->tk_rpc_status = rpc_status;
1619        rpc_exit(task, tk_status);
1620}
1621
1622static void
1623rpc_call_rpcerror(struct rpc_task *task, int status)
1624{
1625        __rpc_call_rpcerror(task, status, status);
1626}
1627
1628/*
1629 * 0.  Initial state
1630 *
1631 *     Other FSM states can be visited zero or more times, but
1632 *     this state is visited exactly once for each RPC.
1633 */
1634static void
1635call_start(struct rpc_task *task)
1636{
1637        struct rpc_clnt *clnt = task->tk_client;
1638        int idx = task->tk_msg.rpc_proc->p_statidx;
1639
1640        trace_rpc_request(task);
1641
1642        /* Increment call count (version might not be valid for ping) */
1643        if (clnt->cl_program->version[clnt->cl_vers])
1644                clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1645        clnt->cl_stats->rpccnt++;
1646        task->tk_action = call_reserve;
1647        rpc_task_set_transport(task, clnt);
1648}
1649
1650/*
1651 * 1.   Reserve an RPC call slot
1652 */
1653static void
1654call_reserve(struct rpc_task *task)
1655{
1656        task->tk_status  = 0;
1657        task->tk_action  = call_reserveresult;
1658        xprt_reserve(task);
1659}
1660
1661static void call_retry_reserve(struct rpc_task *task);
1662
1663/*
1664 * 1b.  Grok the result of xprt_reserve()
1665 */
1666static void
1667call_reserveresult(struct rpc_task *task)
1668{
1669        int status = task->tk_status;
1670
1671        /*
1672         * After a call to xprt_reserve(), we must have either
1673         * a request slot or else an error status.
1674         */
1675        task->tk_status = 0;
1676        if (status >= 0) {
1677                if (task->tk_rqstp) {
1678                        task->tk_action = call_refresh;
1679                        return;
1680                }
1681
1682                rpc_call_rpcerror(task, -EIO);
1683                return;
1684        }
1685
1686        switch (status) {
1687        case -ENOMEM:
1688                rpc_delay(task, HZ >> 2);
1689                fallthrough;
1690        case -EAGAIN:   /* woken up; retry */
1691                task->tk_action = call_retry_reserve;
1692                return;
1693        default:
1694                rpc_call_rpcerror(task, status);
1695        }
1696}
1697
1698/*
1699 * 1c.  Retry reserving an RPC call slot
1700 */
1701static void
1702call_retry_reserve(struct rpc_task *task)
1703{
1704        task->tk_status  = 0;
1705        task->tk_action  = call_reserveresult;
1706        xprt_retry_reserve(task);
1707}
1708
1709/*
1710 * 2.   Bind and/or refresh the credentials
1711 */
1712static void
1713call_refresh(struct rpc_task *task)
1714{
1715        task->tk_action = call_refreshresult;
1716        task->tk_status = 0;
1717        task->tk_client->cl_stats->rpcauthrefresh++;
1718        rpcauth_refreshcred(task);
1719}
1720
1721/*
1722 * 2a.  Process the results of a credential refresh
1723 */
1724static void
1725call_refreshresult(struct rpc_task *task)
1726{
1727        int status = task->tk_status;
1728
1729        task->tk_status = 0;
1730        task->tk_action = call_refresh;
1731        switch (status) {
1732        case 0:
1733                if (rpcauth_uptodatecred(task)) {
1734                        task->tk_action = call_allocate;
1735                        return;
1736                }
1737                /* Use rate-limiting and a max number of retries if refresh
1738                 * had status 0 but failed to update the cred.
1739                 */
1740                fallthrough;
1741        case -ETIMEDOUT:
1742                rpc_delay(task, 3*HZ);
1743                fallthrough;
1744        case -EAGAIN:
1745                status = -EACCES;
1746                fallthrough;
1747        case -EKEYEXPIRED:
1748                if (!task->tk_cred_retry)
1749                        break;
1750                task->tk_cred_retry--;
1751                trace_rpc_retry_refresh_status(task);
1752                return;
1753        }
1754        trace_rpc_refresh_status(task);
1755        rpc_call_rpcerror(task, status);
1756}
1757
1758/*
1759 * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1760 *      (Note: buffer memory is freed in xprt_release).
1761 */
1762static void
1763call_allocate(struct rpc_task *task)
1764{
1765        const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1766        struct rpc_rqst *req = task->tk_rqstp;
1767        struct rpc_xprt *xprt = req->rq_xprt;
1768        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1769        int status;
1770
1771        task->tk_status = 0;
1772        task->tk_action = call_encode;
1773
1774        if (req->rq_buffer)
1775                return;
1776
1777        if (proc->p_proc != 0) {
1778                BUG_ON(proc->p_arglen == 0);
1779                if (proc->p_decode != NULL)
1780                        BUG_ON(proc->p_replen == 0);
1781        }
1782
1783        /*
1784         * Calculate the size (in quads) of the RPC call
1785         * and reply headers, and convert both values
1786         * to byte sizes.
1787         */
1788        req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1789                           proc->p_arglen;
1790        req->rq_callsize <<= 2;
1791        /*
1792         * Note: the reply buffer must at minimum allocate enough space
1793         * for the 'struct accepted_reply' from RFC5531.
1794         */
1795        req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1796                        max_t(size_t, proc->p_replen, 2);
1797        req->rq_rcvsize <<= 2;
1798
1799        status = xprt->ops->buf_alloc(task);
1800        trace_rpc_buf_alloc(task, status);
1801        if (status == 0)
1802                return;
1803        if (status != -ENOMEM) {
1804                rpc_call_rpcerror(task, status);
1805                return;
1806        }
1807
1808        if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1809                task->tk_action = call_allocate;
1810                rpc_delay(task, HZ>>4);
1811                return;
1812        }
1813
1814        rpc_call_rpcerror(task, -ERESTARTSYS);
1815}
1816
1817static int
1818rpc_task_need_encode(struct rpc_task *task)
1819{
1820        return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1821                (!(task->tk_flags & RPC_TASK_SENT) ||
1822                 !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1823                 xprt_request_need_retransmit(task));
1824}
1825
1826static void
1827rpc_xdr_encode(struct rpc_task *task)
1828{
1829        struct rpc_rqst *req = task->tk_rqstp;
1830        struct xdr_stream xdr;
1831
1832        xdr_buf_init(&req->rq_snd_buf,
1833                     req->rq_buffer,
1834                     req->rq_callsize);
1835        xdr_buf_init(&req->rq_rcv_buf,
1836                     req->rq_rbuffer,
1837                     req->rq_rcvsize);
1838
1839        req->rq_reply_bytes_recvd = 0;
1840        req->rq_snd_buf.head[0].iov_len = 0;
1841        xdr_init_encode(&xdr, &req->rq_snd_buf,
1842                        req->rq_snd_buf.head[0].iov_base, req);
1843        xdr_free_bvec(&req->rq_snd_buf);
1844        if (rpc_encode_header(task, &xdr))
1845                return;
1846
1847        task->tk_status = rpcauth_wrap_req(task, &xdr);
1848}
1849
1850/*
1851 * 3.   Encode arguments of an RPC call
1852 */
1853static void
1854call_encode(struct rpc_task *task)
1855{
1856        if (!rpc_task_need_encode(task))
1857                goto out;
1858
1859        /* Dequeue task from the receive queue while we're encoding */
1860        xprt_request_dequeue_xprt(task);
1861        /* Encode here so that rpcsec_gss can use correct sequence number. */
1862        rpc_xdr_encode(task);
1863        /* Did the encode result in an error condition? */
1864        if (task->tk_status != 0) {
1865                /* Was the error nonfatal? */
1866                switch (task->tk_status) {
1867                case -EAGAIN:
1868                case -ENOMEM:
1869                        rpc_delay(task, HZ >> 4);
1870                        break;
1871                case -EKEYEXPIRED:
1872                        if (!task->tk_cred_retry) {
1873                                rpc_exit(task, task->tk_status);
1874                        } else {
1875                                task->tk_action = call_refresh;
1876                                task->tk_cred_retry--;
1877                                trace_rpc_retry_refresh_status(task);
1878                        }
1879                        break;
1880                default:
1881                        rpc_call_rpcerror(task, task->tk_status);
1882                }
1883                return;
1884        }
1885
1886        /* Add task to reply queue before transmission to avoid races */
1887        if (rpc_reply_expected(task))
1888                xprt_request_enqueue_receive(task);
1889        xprt_request_enqueue_transmit(task);
1890out:
1891        task->tk_action = call_transmit;
1892        /* Check that the connection is OK */
1893        if (!xprt_bound(task->tk_xprt))
1894                task->tk_action = call_bind;
1895        else if (!xprt_connected(task->tk_xprt))
1896                task->tk_action = call_connect;
1897}
1898
1899/*
1900 * Helpers to check if the task was already transmitted, and
1901 * to take action when that is the case.
1902 */
1903static bool
1904rpc_task_transmitted(struct rpc_task *task)
1905{
1906        return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1907}
1908
1909static void
1910rpc_task_handle_transmitted(struct rpc_task *task)
1911{
1912        xprt_end_transmit(task);
1913        task->tk_action = call_transmit_status;
1914}
1915
1916/*
1917 * 4.   Get the server port number if not yet set
1918 */
1919static void
1920call_bind(struct rpc_task *task)
1921{
1922        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1923
1924        if (rpc_task_transmitted(task)) {
1925                rpc_task_handle_transmitted(task);
1926                return;
1927        }
1928
1929        if (xprt_bound(xprt)) {
1930                task->tk_action = call_connect;
1931                return;
1932        }
1933
1934        task->tk_action = call_bind_status;
1935        if (!xprt_prepare_transmit(task))
1936                return;
1937
1938        xprt->ops->rpcbind(task);
1939}
1940
1941/*
1942 * 4a.  Sort out bind result
1943 */
1944static void
1945call_bind_status(struct rpc_task *task)
1946{
1947        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1948        int status = -EIO;
1949
1950        if (rpc_task_transmitted(task)) {
1951                rpc_task_handle_transmitted(task);
1952                return;
1953        }
1954
1955        if (task->tk_status >= 0)
1956                goto out_next;
1957        if (xprt_bound(xprt)) {
1958                task->tk_status = 0;
1959                goto out_next;
1960        }
1961
1962        switch (task->tk_status) {
1963        case -ENOMEM:
1964                rpc_delay(task, HZ >> 2);
1965                goto retry_timeout;
1966        case -EACCES:
1967                trace_rpcb_prog_unavail_err(task);
1968                /* fail immediately if this is an RPC ping */
1969                if (task->tk_msg.rpc_proc->p_proc == 0) {
1970                        status = -EOPNOTSUPP;
1971                        break;
1972                }
1973                if (task->tk_rebind_retry == 0)
1974                        break;
1975                task->tk_rebind_retry--;
1976                rpc_delay(task, 3*HZ);
1977                goto retry_timeout;
1978        case -ENOBUFS:
1979                rpc_delay(task, HZ >> 2);
1980                goto retry_timeout;
1981        case -EAGAIN:
1982                goto retry_timeout;
1983        case -ETIMEDOUT:
1984                trace_rpcb_timeout_err(task);
1985                goto retry_timeout;
1986        case -EPFNOSUPPORT:
1987                /* server doesn't support any rpcbind version we know of */
1988                trace_rpcb_bind_version_err(task);
1989                break;
1990        case -EPROTONOSUPPORT:
1991                trace_rpcb_bind_version_err(task);
1992                goto retry_timeout;
1993        case -ECONNREFUSED:             /* connection problems */
1994        case -ECONNRESET:
1995        case -ECONNABORTED:
1996        case -ENOTCONN:
1997        case -EHOSTDOWN:
1998        case -ENETDOWN:
1999        case -EHOSTUNREACH:
2000        case -ENETUNREACH:
2001        case -EPIPE:
2002                trace_rpcb_unreachable_err(task);
2003                if (!RPC_IS_SOFTCONN(task)) {
2004                        rpc_delay(task, 5*HZ);
2005                        goto retry_timeout;
2006                }
2007                status = task->tk_status;
2008                break;
2009        default:
2010                trace_rpcb_unrecognized_err(task);
2011        }
2012
2013        rpc_call_rpcerror(task, status);
2014        return;
2015out_next:
2016        task->tk_action = call_connect;
2017        return;
2018retry_timeout:
2019        task->tk_status = 0;
2020        task->tk_action = call_bind;
2021        rpc_check_timeout(task);
2022}
2023
2024/*
2025 * 4b.  Connect to the RPC server
2026 */
2027static void
2028call_connect(struct rpc_task *task)
2029{
2030        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2031
2032        if (rpc_task_transmitted(task)) {
2033                rpc_task_handle_transmitted(task);
2034                return;
2035        }
2036
2037        if (xprt_connected(xprt)) {
2038                task->tk_action = call_transmit;
2039                return;
2040        }
2041
2042        task->tk_action = call_connect_status;
2043        if (task->tk_status < 0)
2044                return;
2045        if (task->tk_flags & RPC_TASK_NOCONNECT) {
2046                rpc_call_rpcerror(task, -ENOTCONN);
2047                return;
2048        }
2049        if (!xprt_prepare_transmit(task))
2050                return;
2051        xprt_connect(task);
2052}
2053
2054/*
2055 * 4c.  Sort out connect result
2056 */
2057static void
2058call_connect_status(struct rpc_task *task)
2059{
2060        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2061        struct rpc_clnt *clnt = task->tk_client;
2062        int status = task->tk_status;
2063
2064        if (rpc_task_transmitted(task)) {
2065                rpc_task_handle_transmitted(task);
2066                return;
2067        }
2068
2069        trace_rpc_connect_status(task);
2070
2071        if (task->tk_status == 0) {
2072                clnt->cl_stats->netreconn++;
2073                goto out_next;
2074        }
2075        if (xprt_connected(xprt)) {
2076                task->tk_status = 0;
2077                goto out_next;
2078        }
2079
2080        task->tk_status = 0;
2081        switch (status) {
2082        case -ECONNREFUSED:
2083                /* A positive refusal suggests a rebind is needed. */
2084                if (RPC_IS_SOFTCONN(task))
2085                        break;
2086                if (clnt->cl_autobind) {
2087                        rpc_force_rebind(clnt);
2088                        goto out_retry;
2089                }
2090                fallthrough;
2091        case -ECONNRESET:
2092        case -ECONNABORTED:
2093        case -ENETDOWN:
2094        case -ENETUNREACH:
2095        case -EHOSTUNREACH:
2096        case -EPIPE:
2097        case -EPROTO:
2098                xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2099                                            task->tk_rqstp->rq_connect_cookie);
2100                if (RPC_IS_SOFTCONN(task))
2101                        break;
2102                /* retry with existing socket, after a delay */
2103                rpc_delay(task, 3*HZ);
2104                fallthrough;
2105        case -EADDRINUSE:
2106        case -ENOTCONN:
2107        case -EAGAIN:
2108        case -ETIMEDOUT:
2109                if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2110                    (task->tk_flags & RPC_TASK_MOVEABLE) &&
2111                    test_bit(XPRT_REMOVE, &xprt->state)) {
2112                        struct rpc_xprt *saved = task->tk_xprt;
2113                        struct rpc_xprt_switch *xps;
2114
2115                        rcu_read_lock();
2116                        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2117                        rcu_read_unlock();
2118                        if (xps->xps_nxprts > 1) {
2119                                long value;
2120
2121                                xprt_release(task);
2122                                value = atomic_long_dec_return(&xprt->queuelen);
2123                                if (value == 0)
2124                                        rpc_xprt_switch_remove_xprt(xps, saved);
2125                                xprt_put(saved);
2126                                task->tk_xprt = NULL;
2127                                task->tk_action = call_start;
2128                        }
2129                        xprt_switch_put(xps);
2130                        if (!task->tk_xprt)
2131                                return;
2132                }
2133                goto out_retry;
2134        case -ENOBUFS:
2135                rpc_delay(task, HZ >> 2);
2136                goto out_retry;
2137        }
2138        rpc_call_rpcerror(task, status);
2139        return;
2140out_next:
2141        task->tk_action = call_transmit;
2142        return;
2143out_retry:
2144        /* Check for timeouts before looping back to call_bind */
2145        task->tk_action = call_bind;
2146        rpc_check_timeout(task);
2147}
2148
2149/*
2150 * 5.   Transmit the RPC request, and wait for reply
2151 */
2152static void
2153call_transmit(struct rpc_task *task)
2154{
2155        if (rpc_task_transmitted(task)) {
2156                rpc_task_handle_transmitted(task);
2157                return;
2158        }
2159
2160        task->tk_action = call_transmit_status;
2161        if (!xprt_prepare_transmit(task))
2162                return;
2163        task->tk_status = 0;
2164        if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2165                if (!xprt_connected(task->tk_xprt)) {
2166                        task->tk_status = -ENOTCONN;
2167                        return;
2168                }
2169                xprt_transmit(task);
2170        }
2171        xprt_end_transmit(task);
2172}
2173
2174/*
2175 * 5a.  Handle cleanup after a transmission
2176 */
2177static void
2178call_transmit_status(struct rpc_task *task)
2179{
2180        task->tk_action = call_status;
2181
2182        /*
2183         * Common case: success.  Force the compiler to put this
2184         * test first.
2185         */
2186        if (rpc_task_transmitted(task)) {
2187                task->tk_status = 0;
2188                xprt_request_wait_receive(task);
2189                return;
2190        }
2191
2192        switch (task->tk_status) {
2193        default:
2194                break;
2195        case -EBADMSG:
2196                task->tk_status = 0;
2197                task->tk_action = call_encode;
2198                break;
2199                /*
2200                 * Special cases: if we've been waiting on the
2201                 * socket's write_space() callback, or if the
2202                 * socket just returned a connection error,
2203                 * then hold onto the transport lock.
2204                 */
2205        case -ENOBUFS:
2206                rpc_delay(task, HZ>>2);
2207                fallthrough;
2208        case -EBADSLT:
2209        case -EAGAIN:
2210                task->tk_action = call_transmit;
2211                task->tk_status = 0;
2212                break;
2213        case -ECONNREFUSED:
2214        case -EHOSTDOWN:
2215        case -ENETDOWN:
2216        case -EHOSTUNREACH:
2217        case -ENETUNREACH:
2218        case -EPERM:
2219                if (RPC_IS_SOFTCONN(task)) {
2220                        if (!task->tk_msg.rpc_proc->p_proc)
2221                                trace_xprt_ping(task->tk_xprt,
2222                                                task->tk_status);
2223                        rpc_call_rpcerror(task, task->tk_status);
2224                        return;
2225                }
2226                fallthrough;
2227        case -ECONNRESET:
2228        case -ECONNABORTED:
2229        case -EADDRINUSE:
2230        case -ENOTCONN:
2231        case -EPIPE:
2232                task->tk_action = call_bind;
2233                task->tk_status = 0;
2234                break;
2235        }
2236        rpc_check_timeout(task);
2237}
2238
2239#if defined(CONFIG_SUNRPC_BACKCHANNEL)
2240static void call_bc_transmit(struct rpc_task *task);
2241static void call_bc_transmit_status(struct rpc_task *task);
2242
2243static void
2244call_bc_encode(struct rpc_task *task)
2245{
2246        xprt_request_enqueue_transmit(task);
2247        task->tk_action = call_bc_transmit;
2248}
2249
2250/*
2251 * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2252 * addition, disconnect on connectivity errors.
2253 */
2254static void
2255call_bc_transmit(struct rpc_task *task)
2256{
2257        task->tk_action = call_bc_transmit_status;
2258        if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2259                if (!xprt_prepare_transmit(task))
2260                        return;
2261                task->tk_status = 0;
2262                xprt_transmit(task);
2263        }
2264        xprt_end_transmit(task);
2265}
2266
2267static void
2268call_bc_transmit_status(struct rpc_task *task)
2269{
2270        struct rpc_rqst *req = task->tk_rqstp;
2271
2272        if (rpc_task_transmitted(task))
2273                task->tk_status = 0;
2274
2275        switch (task->tk_status) {
2276        case 0:
2277                /* Success */
2278        case -ENETDOWN:
2279        case -EHOSTDOWN:
2280        case -EHOSTUNREACH:
2281        case -ENETUNREACH:
2282        case -ECONNRESET:
2283        case -ECONNREFUSED:
2284        case -EADDRINUSE:
2285        case -ENOTCONN:
2286        case -EPIPE:
2287                break;
2288        case -ENOBUFS:
2289                rpc_delay(task, HZ>>2);
2290                fallthrough;
2291        case -EBADSLT:
2292        case -EAGAIN:
2293                task->tk_status = 0;
2294                task->tk_action = call_bc_transmit;
2295                return;
2296        case -ETIMEDOUT:
2297                /*
2298                 * Problem reaching the server.  Disconnect and let the
2299                 * forechannel reestablish the connection.  The server will
2300                 * have to retransmit the backchannel request and we'll
2301                 * reprocess it.  Since these ops are idempotent, there's no
2302                 * need to cache our reply at this time.
2303                 */
2304                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2305                        "error: %d\n", task->tk_status);
2306                xprt_conditional_disconnect(req->rq_xprt,
2307                        req->rq_connect_cookie);
2308                break;
2309        default:
2310                /*
2311                 * We were unable to reply and will have to drop the
2312                 * request.  The server should reconnect and retransmit.
2313                 */
2314                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2315                        "error: %d\n", task->tk_status);
2316                break;
2317        }
2318        task->tk_action = rpc_exit_task;
2319}
2320#endif /* CONFIG_SUNRPC_BACKCHANNEL */
2321
2322/*
2323 * 6.   Sort out the RPC call status
2324 */
2325static void
2326call_status(struct rpc_task *task)
2327{
2328        struct rpc_clnt *clnt = task->tk_client;
2329        int             status;
2330
2331        if (!task->tk_msg.rpc_proc->p_proc)
2332                trace_xprt_ping(task->tk_xprt, task->tk_status);
2333
2334        status = task->tk_status;
2335        if (status >= 0) {
2336                task->tk_action = call_decode;
2337                return;
2338        }
2339
2340        trace_rpc_call_status(task);
2341        task->tk_status = 0;
2342        switch(status) {
2343        case -EHOSTDOWN:
2344        case -ENETDOWN:
2345        case -EHOSTUNREACH:
2346        case -ENETUNREACH:
2347        case -EPERM:
2348                if (RPC_IS_SOFTCONN(task))
2349                        goto out_exit;
2350                /*
2351                 * Delay any retries for 3 seconds, then handle as if it
2352                 * were a timeout.
2353                 */
2354                rpc_delay(task, 3*HZ);
2355                fallthrough;
2356        case -ETIMEDOUT:
2357                break;
2358        case -ECONNREFUSED:
2359        case -ECONNRESET:
2360        case -ECONNABORTED:
2361        case -ENOTCONN:
2362                rpc_force_rebind(clnt);
2363                break;
2364        case -EADDRINUSE:
2365                rpc_delay(task, 3*HZ);
2366                fallthrough;
2367        case -EPIPE:
2368        case -EAGAIN:
2369                break;
2370        case -EIO:
2371                /* shutdown or soft timeout */
2372                goto out_exit;
2373        default:
2374                if (clnt->cl_chatty)
2375                        printk("%s: RPC call returned error %d\n",
2376                               clnt->cl_program->name, -status);
2377                goto out_exit;
2378        }
2379        task->tk_action = call_encode;
2380        if (status != -ECONNRESET && status != -ECONNABORTED)
2381                rpc_check_timeout(task);
2382        return;
2383out_exit:
2384        rpc_call_rpcerror(task, status);
2385}
2386
2387static bool
2388rpc_check_connected(const struct rpc_rqst *req)
2389{
2390        /* No allocated request or transport? return true */
2391        if (!req || !req->rq_xprt)
2392                return true;
2393        return xprt_connected(req->rq_xprt);
2394}
2395
2396static void
2397rpc_check_timeout(struct rpc_task *task)
2398{
2399        struct rpc_clnt *clnt = task->tk_client;
2400
2401        if (RPC_SIGNALLED(task)) {
2402                rpc_call_rpcerror(task, -ERESTARTSYS);
2403                return;
2404        }
2405
2406        if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2407                return;
2408
2409        trace_rpc_timeout_status(task);
2410        task->tk_timeouts++;
2411
2412        if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2413                rpc_call_rpcerror(task, -ETIMEDOUT);
2414                return;
2415        }
2416
2417        if (RPC_IS_SOFT(task)) {
2418                /*
2419                 * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2420                 * been sent, it should time out only if the transport
2421                 * connection gets terminally broken.
2422                 */
2423                if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2424                    rpc_check_connected(task->tk_rqstp))
2425                        return;
2426
2427                if (clnt->cl_chatty) {
2428                        pr_notice_ratelimited(
2429                                "%s: server %s not responding, timed out\n",
2430                                clnt->cl_program->name,
2431                                task->tk_xprt->servername);
2432                }
2433                if (task->tk_flags & RPC_TASK_TIMEOUT)
2434                        rpc_call_rpcerror(task, -ETIMEDOUT);
2435                else
2436                        __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2437                return;
2438        }
2439
2440        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2441                task->tk_flags |= RPC_CALL_MAJORSEEN;
2442                if (clnt->cl_chatty) {
2443                        pr_notice_ratelimited(
2444                                "%s: server %s not responding, still trying\n",
2445                                clnt->cl_program->name,
2446                                task->tk_xprt->servername);
2447                }
2448        }
2449        rpc_force_rebind(clnt);
2450        /*
2451         * Did our request time out due to an RPCSEC_GSS out-of-sequence
2452         * event? RFC2203 requires the server to drop all such requests.
2453         */
2454        rpcauth_invalcred(task);
2455}
2456
2457/*
2458 * 7.   Decode the RPC reply
2459 */
2460static void
2461call_decode(struct rpc_task *task)
2462{
2463        struct rpc_clnt *clnt = task->tk_client;
2464        struct rpc_rqst *req = task->tk_rqstp;
2465        struct xdr_stream xdr;
2466        int err;
2467
2468        if (!task->tk_msg.rpc_proc->p_decode) {
2469                task->tk_action = rpc_exit_task;
2470                return;
2471        }
2472
2473        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2474                if (clnt->cl_chatty) {
2475                        pr_notice_ratelimited("%s: server %s OK\n",
2476                                clnt->cl_program->name,
2477                                task->tk_xprt->servername);
2478                }
2479                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2480        }
2481
2482        /*
2483         * Did we ever call xprt_complete_rqst()? If not, we should assume
2484         * the message is incomplete.
2485         */
2486        err = -EAGAIN;
2487        if (!req->rq_reply_bytes_recvd)
2488                goto out;
2489
2490        /* Ensure that we see all writes made by xprt_complete_rqst()
2491         * before it changed req->rq_reply_bytes_recvd.
2492         */
2493        smp_rmb();
2494
2495        req->rq_rcv_buf.len = req->rq_private_buf.len;
2496        trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2497
2498        /* Check that the softirq receive buffer is valid */
2499        WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2500                                sizeof(req->rq_rcv_buf)) != 0);
2501
2502        xdr_init_decode(&xdr, &req->rq_rcv_buf,
2503                        req->rq_rcv_buf.head[0].iov_base, req);
2504        err = rpc_decode_header(task, &xdr);
2505out:
2506        switch (err) {
2507        case 0:
2508                task->tk_action = rpc_exit_task;
2509                task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2510                return;
2511        case -EAGAIN:
2512                task->tk_status = 0;
2513                if (task->tk_client->cl_discrtry)
2514                        xprt_conditional_disconnect(req->rq_xprt,
2515                                                    req->rq_connect_cookie);
2516                task->tk_action = call_encode;
2517                rpc_check_timeout(task);
2518                break;
2519        case -EKEYREJECTED:
2520                task->tk_action = call_reserve;
2521                rpc_check_timeout(task);
2522                rpcauth_invalcred(task);
2523                /* Ensure we obtain a new XID if we retry! */
2524                xprt_release(task);
2525        }
2526}
2527
2528static int
2529rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2530{
2531        struct rpc_clnt *clnt = task->tk_client;
2532        struct rpc_rqst *req = task->tk_rqstp;
2533        __be32 *p;
2534        int error;
2535
2536        error = -EMSGSIZE;
2537        p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2538        if (!p)
2539                goto out_fail;
2540        *p++ = req->rq_xid;
2541        *p++ = rpc_call;
2542        *p++ = cpu_to_be32(RPC_VERSION);
2543        *p++ = cpu_to_be32(clnt->cl_prog);
2544        *p++ = cpu_to_be32(clnt->cl_vers);
2545        *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2546
2547        error = rpcauth_marshcred(task, xdr);
2548        if (error < 0)
2549                goto out_fail;
2550        return 0;
2551out_fail:
2552        trace_rpc_bad_callhdr(task);
2553        rpc_call_rpcerror(task, error);
2554        return error;
2555}
2556
2557static noinline int
2558rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2559{
2560        struct rpc_clnt *clnt = task->tk_client;
2561        int error;
2562        __be32 *p;
2563
2564        /* RFC-1014 says that the representation of XDR data must be a
2565         * multiple of four bytes
2566         * - if it isn't pointer subtraction in the NFS client may give
2567         *   undefined results
2568         */
2569        if (task->tk_rqstp->rq_rcv_buf.len & 3)
2570                goto out_unparsable;
2571
2572        p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2573        if (!p)
2574                goto out_unparsable;
2575        p++;    /* skip XID */
2576        if (*p++ != rpc_reply)
2577                goto out_unparsable;
2578        if (*p++ != rpc_msg_accepted)
2579                goto out_msg_denied;
2580
2581        error = rpcauth_checkverf(task, xdr);
2582        if (error)
2583                goto out_verifier;
2584
2585        p = xdr_inline_decode(xdr, sizeof(*p));
2586        if (!p)
2587                goto out_unparsable;
2588        switch (*p) {
2589        case rpc_success:
2590                return 0;
2591        case rpc_prog_unavail:
2592                trace_rpc__prog_unavail(task);
2593                error = -EPFNOSUPPORT;
2594                goto out_err;
2595        case rpc_prog_mismatch:
2596                trace_rpc__prog_mismatch(task);
2597                error = -EPROTONOSUPPORT;
2598                goto out_err;
2599        case rpc_proc_unavail:
2600                trace_rpc__proc_unavail(task);
2601                error = -EOPNOTSUPP;
2602                goto out_err;
2603        case rpc_garbage_args:
2604        case rpc_system_err:
2605                trace_rpc__garbage_args(task);
2606                error = -EIO;
2607                break;
2608        default:
2609                goto out_unparsable;
2610        }
2611
2612out_garbage:
2613        clnt->cl_stats->rpcgarbage++;
2614        if (task->tk_garb_retry) {
2615                task->tk_garb_retry--;
2616                task->tk_action = call_encode;
2617                return -EAGAIN;
2618        }
2619out_err:
2620        rpc_call_rpcerror(task, error);
2621        return error;
2622
2623out_unparsable:
2624        trace_rpc__unparsable(task);
2625        error = -EIO;
2626        goto out_garbage;
2627
2628out_verifier:
2629        trace_rpc_bad_verifier(task);
2630        goto out_garbage;
2631
2632out_msg_denied:
2633        error = -EACCES;
2634        p = xdr_inline_decode(xdr, sizeof(*p));
2635        if (!p)
2636                goto out_unparsable;
2637        switch (*p++) {
2638        case rpc_auth_error:
2639                break;
2640        case rpc_mismatch:
2641                trace_rpc__mismatch(task);
2642                error = -EPROTONOSUPPORT;
2643                goto out_err;
2644        default:
2645                goto out_unparsable;
2646        }
2647
2648        p = xdr_inline_decode(xdr, sizeof(*p));
2649        if (!p)
2650                goto out_unparsable;
2651        switch (*p++) {
2652        case rpc_autherr_rejectedcred:
2653        case rpc_autherr_rejectedverf:
2654        case rpcsec_gsserr_credproblem:
2655        case rpcsec_gsserr_ctxproblem:
2656                if (!task->tk_cred_retry)
2657                        break;
2658                task->tk_cred_retry--;
2659                trace_rpc__stale_creds(task);
2660                return -EKEYREJECTED;
2661        case rpc_autherr_badcred:
2662        case rpc_autherr_badverf:
2663                /* possibly garbled cred/verf? */
2664                if (!task->tk_garb_retry)
2665                        break;
2666                task->tk_garb_retry--;
2667                trace_rpc__bad_creds(task);
2668                task->tk_action = call_encode;
2669                return -EAGAIN;
2670        case rpc_autherr_tooweak:
2671                trace_rpc__auth_tooweak(task);
2672                pr_warn("RPC: server %s requires stronger authentication.\n",
2673                        task->tk_xprt->servername);
2674                break;
2675        default:
2676                goto out_unparsable;
2677        }
2678        goto out_err;
2679}
2680
2681static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2682                const void *obj)
2683{
2684}
2685
2686static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2687                void *obj)
2688{
2689        return 0;
2690}
2691
2692static const struct rpc_procinfo rpcproc_null = {
2693        .p_encode = rpcproc_encode_null,
2694        .p_decode = rpcproc_decode_null,
2695};
2696
2697static int rpc_ping(struct rpc_clnt *clnt)
2698{
2699        struct rpc_message msg = {
2700                .rpc_proc = &rpcproc_null,
2701        };
2702        int err;
2703        err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2704                            RPC_TASK_NULLCREDS);
2705        return err;
2706}
2707
2708static
2709struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2710                struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2711                const struct rpc_call_ops *ops, void *data)
2712{
2713        struct rpc_message msg = {
2714                .rpc_proc = &rpcproc_null,
2715        };
2716        struct rpc_task_setup task_setup_data = {
2717                .rpc_client = clnt,
2718                .rpc_xprt = xprt,
2719                .rpc_message = &msg,
2720                .rpc_op_cred = cred,
2721                .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
2722                .callback_data = data,
2723                .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2724                         RPC_TASK_NULLCREDS,
2725        };
2726
2727        return rpc_run_task(&task_setup_data);
2728}
2729
2730struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2731{
2732        return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2733}
2734EXPORT_SYMBOL_GPL(rpc_call_null);
2735
2736struct rpc_cb_add_xprt_calldata {
2737        struct rpc_xprt_switch *xps;
2738        struct rpc_xprt *xprt;
2739};
2740
2741static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2742{
2743        struct rpc_cb_add_xprt_calldata *data = calldata;
2744
2745        if (task->tk_status == 0)
2746                rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2747}
2748
2749static void rpc_cb_add_xprt_release(void *calldata)
2750{
2751        struct rpc_cb_add_xprt_calldata *data = calldata;
2752
2753        xprt_put(data->xprt);
2754        xprt_switch_put(data->xps);
2755        kfree(data);
2756}
2757
2758static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2759        .rpc_call_done = rpc_cb_add_xprt_done,
2760        .rpc_release = rpc_cb_add_xprt_release,
2761};
2762
2763/**
2764 * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2765 * @clnt: pointer to struct rpc_clnt
2766 * @xps: pointer to struct rpc_xprt_switch,
2767 * @xprt: pointer struct rpc_xprt
2768 * @dummy: unused
2769 */
2770int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2771                struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2772                void *dummy)
2773{
2774        struct rpc_cb_add_xprt_calldata *data;
2775        struct rpc_task *task;
2776
2777        data = kmalloc(sizeof(*data), GFP_NOFS);
2778        if (!data)
2779                return -ENOMEM;
2780        data->xps = xprt_switch_get(xps);
2781        data->xprt = xprt_get(xprt);
2782        if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2783                rpc_cb_add_xprt_release(data);
2784                goto success;
2785        }
2786
2787        task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2788                        &rpc_cb_add_xprt_call_ops, data);
2789
2790        rpc_put_task(task);
2791success:
2792        return 1;
2793}
2794EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2795
2796/**
2797 * rpc_clnt_setup_test_and_add_xprt()
2798 *
2799 * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2800 *   1) caller of the test function must dereference the rpc_xprt_switch
2801 *   and the rpc_xprt.
2802 *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2803 *   the rpc_call_done routine.
2804 *
2805 * Upon success (return of 1), the test function adds the new
2806 * transport to the rpc_clnt xprt switch
2807 *
2808 * @clnt: struct rpc_clnt to get the new transport
2809 * @xps:  the rpc_xprt_switch to hold the new transport
2810 * @xprt: the rpc_xprt to test
2811 * @data: a struct rpc_add_xprt_test pointer that holds the test function
2812 *        and test function call data
2813 */
2814int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2815                                     struct rpc_xprt_switch *xps,
2816                                     struct rpc_xprt *xprt,
2817                                     void *data)
2818{
2819        struct rpc_task *task;
2820        struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2821        int status = -EADDRINUSE;
2822
2823        xprt = xprt_get(xprt);
2824        xprt_switch_get(xps);
2825
2826        if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2827                goto out_err;
2828
2829        /* Test the connection */
2830        task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2831        if (IS_ERR(task)) {
2832                status = PTR_ERR(task);
2833                goto out_err;
2834        }
2835        status = task->tk_status;
2836        rpc_put_task(task);
2837
2838        if (status < 0)
2839                goto out_err;
2840
2841        /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2842        xtest->add_xprt_test(clnt, xprt, xtest->data);
2843
2844        xprt_put(xprt);
2845        xprt_switch_put(xps);
2846
2847        /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2848        return 1;
2849out_err:
2850        xprt_put(xprt);
2851        xprt_switch_put(xps);
2852        pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2853                status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2854        return status;
2855}
2856EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2857
2858/**
2859 * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2860 * @clnt: pointer to struct rpc_clnt
2861 * @xprtargs: pointer to struct xprt_create
2862 * @setup: callback to test and/or set up the connection
2863 * @data: pointer to setup function data
2864 *
2865 * Creates a new transport using the parameters set in args and
2866 * adds it to clnt.
2867 * If ping is set, then test that connectivity succeeds before
2868 * adding the new transport.
2869 *
2870 */
2871int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2872                struct xprt_create *xprtargs,
2873                int (*setup)(struct rpc_clnt *,
2874                        struct rpc_xprt_switch *,
2875                        struct rpc_xprt *,
2876                        void *),
2877                void *data)
2878{
2879        struct rpc_xprt_switch *xps;
2880        struct rpc_xprt *xprt;
2881        unsigned long connect_timeout;
2882        unsigned long reconnect_timeout;
2883        unsigned char resvport, reuseport;
2884        int ret = 0;
2885
2886        rcu_read_lock();
2887        xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2888        xprt = xprt_iter_xprt(&clnt->cl_xpi);
2889        if (xps == NULL || xprt == NULL) {
2890                rcu_read_unlock();
2891                xprt_switch_put(xps);
2892                return -EAGAIN;
2893        }
2894        resvport = xprt->resvport;
2895        reuseport = xprt->reuseport;
2896        connect_timeout = xprt->connect_timeout;
2897        reconnect_timeout = xprt->max_reconnect_timeout;
2898        rcu_read_unlock();
2899
2900        xprt = xprt_create_transport(xprtargs);
2901        if (IS_ERR(xprt)) {
2902                ret = PTR_ERR(xprt);
2903                goto out_put_switch;
2904        }
2905        xprt->resvport = resvport;
2906        xprt->reuseport = reuseport;
2907        if (xprt->ops->set_connect_timeout != NULL)
2908                xprt->ops->set_connect_timeout(xprt,
2909                                connect_timeout,
2910                                reconnect_timeout);
2911
2912        rpc_xprt_switch_set_roundrobin(xps);
2913        if (setup) {
2914                ret = setup(clnt, xps, xprt, data);
2915                if (ret != 0)
2916                        goto out_put_xprt;
2917        }
2918        rpc_xprt_switch_add_xprt(xps, xprt);
2919out_put_xprt:
2920        xprt_put(xprt);
2921out_put_switch:
2922        xprt_switch_put(xps);
2923        return ret;
2924}
2925EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2926
2927struct connect_timeout_data {
2928        unsigned long connect_timeout;
2929        unsigned long reconnect_timeout;
2930};
2931
2932static int
2933rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2934                struct rpc_xprt *xprt,
2935                void *data)
2936{
2937        struct connect_timeout_data *timeo = data;
2938
2939        if (xprt->ops->set_connect_timeout)
2940                xprt->ops->set_connect_timeout(xprt,
2941                                timeo->connect_timeout,
2942                                timeo->reconnect_timeout);
2943        return 0;
2944}
2945
2946void
2947rpc_set_connect_timeout(struct rpc_clnt *clnt,
2948                unsigned long connect_timeout,
2949                unsigned long reconnect_timeout)
2950{
2951        struct connect_timeout_data timeout = {
2952                .connect_timeout = connect_timeout,
2953                .reconnect_timeout = reconnect_timeout,
2954        };
2955        rpc_clnt_iterate_for_each_xprt(clnt,
2956                        rpc_xprt_set_connect_timeout,
2957                        &timeout);
2958}
2959EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2960
2961void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2962{
2963        rcu_read_lock();
2964        xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2965        rcu_read_unlock();
2966}
2967EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
2968
2969void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
2970{
2971        rcu_read_lock();
2972        rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
2973                                 xprt);
2974        rcu_read_unlock();
2975}
2976EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
2977
2978bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
2979                                   const struct sockaddr *sap)
2980{
2981        struct rpc_xprt_switch *xps;
2982        bool ret;
2983
2984        rcu_read_lock();
2985        xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
2986        ret = rpc_xprt_switch_has_addr(xps, sap);
2987        rcu_read_unlock();
2988        return ret;
2989}
2990EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
2991
2992#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2993static void rpc_show_header(void)
2994{
2995        printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2996                "-timeout ---ops--\n");
2997}
2998
2999static void rpc_show_task(const struct rpc_clnt *clnt,
3000                          const struct rpc_task *task)
3001{
3002        const char *rpc_waitq = "none";
3003
3004        if (RPC_IS_QUEUED(task))
3005                rpc_waitq = rpc_qname(task->tk_waitqueue);
3006
3007        printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3008                task->tk_pid, task->tk_flags, task->tk_status,
3009                clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3010                clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3011                task->tk_action, rpc_waitq);
3012}
3013
3014void rpc_show_tasks(struct net *net)
3015{
3016        struct rpc_clnt *clnt;
3017        struct rpc_task *task;
3018        int header = 0;
3019        struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3020
3021        spin_lock(&sn->rpc_client_lock);
3022        list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3023                spin_lock(&clnt->cl_lock);
3024                list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3025                        if (!header) {
3026                                rpc_show_header();
3027                                header++;
3028                        }
3029                        rpc_show_task(clnt, task);
3030                }
3031                spin_unlock(&clnt->cl_lock);
3032        }
3033        spin_unlock(&sn->rpc_client_lock);
3034}
3035#endif
3036
3037#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3038static int
3039rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3040                struct rpc_xprt *xprt,
3041                void *dummy)
3042{
3043        return xprt_enable_swap(xprt);
3044}
3045
3046int
3047rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3048{
3049        if (atomic_inc_return(&clnt->cl_swapper) == 1)
3050                return rpc_clnt_iterate_for_each_xprt(clnt,
3051                                rpc_clnt_swap_activate_callback, NULL);
3052        return 0;
3053}
3054EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3055
3056static int
3057rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3058                struct rpc_xprt *xprt,
3059                void *dummy)
3060{
3061        xprt_disable_swap(xprt);
3062        return 0;
3063}
3064
3065void
3066rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3067{
3068        if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3069                rpc_clnt_iterate_for_each_xprt(clnt,
3070                                rpc_clnt_swap_deactivate_callback, NULL);
3071}
3072EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3073#endif /* CONFIG_SUNRPC_SWAP */
3074