linux/net/sunrpc/svc.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/svc.c
   3 *
   4 * High-level RPC service routines
   5 *
   6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   7 *
   8 * Multiple threads pools and NUMAisation
   9 * Copyright (c) 2006 Silicon Graphics, Inc.
  10 * by Greg Banks <gnb@melbourne.sgi.com>
  11 */
  12
  13#include <linux/linkage.h>
  14#include <linux/sched.h>
  15#include <linux/errno.h>
  16#include <linux/net.h>
  17#include <linux/in.h>
  18#include <linux/mm.h>
  19#include <linux/interrupt.h>
  20#include <linux/module.h>
  21#include <linux/kthread.h>
  22#include <linux/slab.h>
  23
  24#include <linux/sunrpc/types.h>
  25#include <linux/sunrpc/xdr.h>
  26#include <linux/sunrpc/stats.h>
  27#include <linux/sunrpc/svcsock.h>
  28#include <linux/sunrpc/clnt.h>
  29#include <linux/sunrpc/bc_xprt.h>
  30
  31#define RPCDBG_FACILITY RPCDBG_SVCDSP
  32
  33static void svc_unregister(const struct svc_serv *serv, struct net *net);
  34
  35#define svc_serv_is_pooled(serv)    ((serv)->sv_function)
  36
  37/*
  38 * Mode for mapping cpus to pools.
  39 */
  40enum {
  41        SVC_POOL_AUTO = -1,     /* choose one of the others */
  42        SVC_POOL_GLOBAL,        /* no mapping, just a single global pool
  43                                 * (legacy & UP mode) */
  44        SVC_POOL_PERCPU,        /* one pool per cpu */
  45        SVC_POOL_PERNODE        /* one pool per numa node */
  46};
  47#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  48
  49/*
  50 * Structure for mapping cpus to pools and vice versa.
  51 * Setup once during sunrpc initialisation.
  52 */
  53static struct svc_pool_map {
  54        int count;                      /* How many svc_servs use us */
  55        int mode;                       /* Note: int not enum to avoid
  56                                         * warnings about "enumeration value
  57                                         * not handled in switch" */
  58        unsigned int npools;
  59        unsigned int *pool_to;          /* maps pool id to cpu or node */
  60        unsigned int *to_pool;          /* maps cpu or node to pool id */
  61} svc_pool_map = {
  62        .count = 0,
  63        .mode = SVC_POOL_DEFAULT
  64};
  65static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  66
  67static int
  68param_set_pool_mode(const char *val, struct kernel_param *kp)
  69{
  70        int *ip = (int *)kp->arg;
  71        struct svc_pool_map *m = &svc_pool_map;
  72        int err;
  73
  74        mutex_lock(&svc_pool_map_mutex);
  75
  76        err = -EBUSY;
  77        if (m->count)
  78                goto out;
  79
  80        err = 0;
  81        if (!strncmp(val, "auto", 4))
  82                *ip = SVC_POOL_AUTO;
  83        else if (!strncmp(val, "global", 6))
  84                *ip = SVC_POOL_GLOBAL;
  85        else if (!strncmp(val, "percpu", 6))
  86                *ip = SVC_POOL_PERCPU;
  87        else if (!strncmp(val, "pernode", 7))
  88                *ip = SVC_POOL_PERNODE;
  89        else
  90                err = -EINVAL;
  91
  92out:
  93        mutex_unlock(&svc_pool_map_mutex);
  94        return err;
  95}
  96
  97static int
  98param_get_pool_mode(char *buf, struct kernel_param *kp)
  99{
 100        int *ip = (int *)kp->arg;
 101
 102        switch (*ip)
 103        {
 104        case SVC_POOL_AUTO:
 105                return strlcpy(buf, "auto", 20);
 106        case SVC_POOL_GLOBAL:
 107                return strlcpy(buf, "global", 20);
 108        case SVC_POOL_PERCPU:
 109                return strlcpy(buf, "percpu", 20);
 110        case SVC_POOL_PERNODE:
 111                return strlcpy(buf, "pernode", 20);
 112        default:
 113                return sprintf(buf, "%d", *ip);
 114        }
 115}
 116
 117module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 118                 &svc_pool_map.mode, 0644);
 119
 120/*
 121 * Detect best pool mapping mode heuristically,
 122 * according to the machine's topology.
 123 */
 124static int
 125svc_pool_map_choose_mode(void)
 126{
 127        unsigned int node;
 128
 129        if (nr_online_nodes > 1) {
 130                /*
 131                 * Actually have multiple NUMA nodes,
 132                 * so split pools on NUMA node boundaries
 133                 */
 134                return SVC_POOL_PERNODE;
 135        }
 136
 137        node = first_online_node;
 138        if (nr_cpus_node(node) > 2) {
 139                /*
 140                 * Non-trivial SMP, or CONFIG_NUMA on
 141                 * non-NUMA hardware, e.g. with a generic
 142                 * x86_64 kernel on Xeons.  In this case we
 143                 * want to divide the pools on cpu boundaries.
 144                 */
 145                return SVC_POOL_PERCPU;
 146        }
 147
 148        /* default: one global pool */
 149        return SVC_POOL_GLOBAL;
 150}
 151
 152/*
 153 * Allocate the to_pool[] and pool_to[] arrays.
 154 * Returns 0 on success or an errno.
 155 */
 156static int
 157svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 158{
 159        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 160        if (!m->to_pool)
 161                goto fail;
 162        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 163        if (!m->pool_to)
 164                goto fail_free;
 165
 166        return 0;
 167
 168fail_free:
 169        kfree(m->to_pool);
 170        m->to_pool = NULL;
 171fail:
 172        return -ENOMEM;
 173}
 174
 175/*
 176 * Initialise the pool map for SVC_POOL_PERCPU mode.
 177 * Returns number of pools or <0 on error.
 178 */
 179static int
 180svc_pool_map_init_percpu(struct svc_pool_map *m)
 181{
 182        unsigned int maxpools = nr_cpu_ids;
 183        unsigned int pidx = 0;
 184        unsigned int cpu;
 185        int err;
 186
 187        err = svc_pool_map_alloc_arrays(m, maxpools);
 188        if (err)
 189                return err;
 190
 191        for_each_online_cpu(cpu) {
 192                BUG_ON(pidx > maxpools);
 193                m->to_pool[cpu] = pidx;
 194                m->pool_to[pidx] = cpu;
 195                pidx++;
 196        }
 197        /* cpus brought online later all get mapped to pool0, sorry */
 198
 199        return pidx;
 200};
 201
 202
 203/*
 204 * Initialise the pool map for SVC_POOL_PERNODE mode.
 205 * Returns number of pools or <0 on error.
 206 */
 207static int
 208svc_pool_map_init_pernode(struct svc_pool_map *m)
 209{
 210        unsigned int maxpools = nr_node_ids;
 211        unsigned int pidx = 0;
 212        unsigned int node;
 213        int err;
 214
 215        err = svc_pool_map_alloc_arrays(m, maxpools);
 216        if (err)
 217                return err;
 218
 219        for_each_node_with_cpus(node) {
 220                /* some architectures (e.g. SN2) have cpuless nodes */
 221                BUG_ON(pidx > maxpools);
 222                m->to_pool[node] = pidx;
 223                m->pool_to[pidx] = node;
 224                pidx++;
 225        }
 226        /* nodes brought online later all get mapped to pool0, sorry */
 227
 228        return pidx;
 229}
 230
 231
 232/*
 233 * Add a reference to the global map of cpus to pools (and
 234 * vice versa).  Initialise the map if we're the first user.
 235 * Returns the number of pools.
 236 */
 237static unsigned int
 238svc_pool_map_get(void)
 239{
 240        struct svc_pool_map *m = &svc_pool_map;
 241        int npools = -1;
 242
 243        mutex_lock(&svc_pool_map_mutex);
 244
 245        if (m->count++) {
 246                mutex_unlock(&svc_pool_map_mutex);
 247                return m->npools;
 248        }
 249
 250        if (m->mode == SVC_POOL_AUTO)
 251                m->mode = svc_pool_map_choose_mode();
 252
 253        switch (m->mode) {
 254        case SVC_POOL_PERCPU:
 255                npools = svc_pool_map_init_percpu(m);
 256                break;
 257        case SVC_POOL_PERNODE:
 258                npools = svc_pool_map_init_pernode(m);
 259                break;
 260        }
 261
 262        if (npools < 0) {
 263                /* default, or memory allocation failure */
 264                npools = 1;
 265                m->mode = SVC_POOL_GLOBAL;
 266        }
 267        m->npools = npools;
 268
 269        mutex_unlock(&svc_pool_map_mutex);
 270        return m->npools;
 271}
 272
 273
 274/*
 275 * Drop a reference to the global map of cpus to pools.
 276 * When the last reference is dropped, the map data is
 277 * freed; this allows the sysadmin to change the pool
 278 * mode using the pool_mode module option without
 279 * rebooting or re-loading sunrpc.ko.
 280 */
 281static void
 282svc_pool_map_put(void)
 283{
 284        struct svc_pool_map *m = &svc_pool_map;
 285
 286        mutex_lock(&svc_pool_map_mutex);
 287
 288        if (!--m->count) {
 289                kfree(m->to_pool);
 290                m->to_pool = NULL;
 291                kfree(m->pool_to);
 292                m->pool_to = NULL;
 293                m->npools = 0;
 294        }
 295
 296        mutex_unlock(&svc_pool_map_mutex);
 297}
 298
 299
 300static int svc_pool_map_get_node(unsigned int pidx)
 301{
 302        const struct svc_pool_map *m = &svc_pool_map;
 303
 304        if (m->count) {
 305                if (m->mode == SVC_POOL_PERCPU)
 306                        return cpu_to_node(m->pool_to[pidx]);
 307                if (m->mode == SVC_POOL_PERNODE)
 308                        return m->pool_to[pidx];
 309        }
 310        return NUMA_NO_NODE;
 311}
 312/*
 313 * Set the given thread's cpus_allowed mask so that it
 314 * will only run on cpus in the given pool.
 315 */
 316static inline void
 317svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 318{
 319        struct svc_pool_map *m = &svc_pool_map;
 320        unsigned int node = m->pool_to[pidx];
 321
 322        /*
 323         * The caller checks for sv_nrpools > 1, which
 324         * implies that we've been initialized.
 325         */
 326        WARN_ON_ONCE(m->count == 0);
 327        if (m->count == 0)
 328                return;
 329
 330        switch (m->mode) {
 331        case SVC_POOL_PERCPU:
 332        {
 333                set_cpus_allowed_ptr(task, cpumask_of(node));
 334                break;
 335        }
 336        case SVC_POOL_PERNODE:
 337        {
 338                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 339                break;
 340        }
 341        }
 342}
 343
 344/*
 345 * Use the mapping mode to choose a pool for a given CPU.
 346 * Used when enqueueing an incoming RPC.  Always returns
 347 * a non-NULL pool pointer.
 348 */
 349struct svc_pool *
 350svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 351{
 352        struct svc_pool_map *m = &svc_pool_map;
 353        unsigned int pidx = 0;
 354
 355        /*
 356         * An uninitialised map happens in a pure client when
 357         * lockd is brought up, so silently treat it the
 358         * same as SVC_POOL_GLOBAL.
 359         */
 360        if (svc_serv_is_pooled(serv)) {
 361                switch (m->mode) {
 362                case SVC_POOL_PERCPU:
 363                        pidx = m->to_pool[cpu];
 364                        break;
 365                case SVC_POOL_PERNODE:
 366                        pidx = m->to_pool[cpu_to_node(cpu)];
 367                        break;
 368                }
 369        }
 370        return &serv->sv_pools[pidx % serv->sv_nrpools];
 371}
 372
 373int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
 374{
 375        int err;
 376
 377        err = rpcb_create_local(net);
 378        if (err)
 379                return err;
 380
 381        /* Remove any stale portmap registrations */
 382        svc_unregister(serv, net);
 383        return 0;
 384}
 385EXPORT_SYMBOL_GPL(svc_rpcb_setup);
 386
 387void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
 388{
 389        svc_unregister(serv, net);
 390        rpcb_put_local(net);
 391}
 392EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
 393
 394static int svc_uses_rpcbind(struct svc_serv *serv)
 395{
 396        struct svc_program      *progp;
 397        unsigned int            i;
 398
 399        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 400                for (i = 0; i < progp->pg_nvers; i++) {
 401                        if (progp->pg_vers[i] == NULL)
 402                                continue;
 403                        if (progp->pg_vers[i]->vs_hidden == 0)
 404                                return 1;
 405                }
 406        }
 407
 408        return 0;
 409}
 410
 411int svc_bind(struct svc_serv *serv, struct net *net)
 412{
 413        if (!svc_uses_rpcbind(serv))
 414                return 0;
 415        return svc_rpcb_setup(serv, net);
 416}
 417EXPORT_SYMBOL_GPL(svc_bind);
 418
 419/*
 420 * Create an RPC service
 421 */
 422static struct svc_serv *
 423__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 424             void (*shutdown)(struct svc_serv *serv, struct net *net))
 425{
 426        struct svc_serv *serv;
 427        unsigned int vers;
 428        unsigned int xdrsize;
 429        unsigned int i;
 430
 431        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 432                return NULL;
 433        serv->sv_name      = prog->pg_name;
 434        serv->sv_program   = prog;
 435        serv->sv_nrthreads = 1;
 436        serv->sv_stats     = prog->pg_stats;
 437        if (bufsize > RPCSVC_MAXPAYLOAD)
 438                bufsize = RPCSVC_MAXPAYLOAD;
 439        serv->sv_max_payload = bufsize? bufsize : 4096;
 440        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 441        serv->sv_shutdown  = shutdown;
 442        xdrsize = 0;
 443        while (prog) {
 444                prog->pg_lovers = prog->pg_nvers-1;
 445                for (vers=0; vers<prog->pg_nvers ; vers++)
 446                        if (prog->pg_vers[vers]) {
 447                                prog->pg_hivers = vers;
 448                                if (prog->pg_lovers > vers)
 449                                        prog->pg_lovers = vers;
 450                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 451                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 452                        }
 453                prog = prog->pg_next;
 454        }
 455        serv->sv_xdrsize   = xdrsize;
 456        INIT_LIST_HEAD(&serv->sv_tempsocks);
 457        INIT_LIST_HEAD(&serv->sv_permsocks);
 458        init_timer(&serv->sv_temptimer);
 459        spin_lock_init(&serv->sv_lock);
 460
 461        serv->sv_nrpools = npools;
 462        serv->sv_pools =
 463                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 464                        GFP_KERNEL);
 465        if (!serv->sv_pools) {
 466                kfree(serv);
 467                return NULL;
 468        }
 469
 470        for (i = 0; i < serv->sv_nrpools; i++) {
 471                struct svc_pool *pool = &serv->sv_pools[i];
 472
 473                dprintk("svc: initialising pool %u for %s\n",
 474                                i, serv->sv_name);
 475
 476                pool->sp_id = i;
 477                INIT_LIST_HEAD(&pool->sp_threads);
 478                INIT_LIST_HEAD(&pool->sp_sockets);
 479                INIT_LIST_HEAD(&pool->sp_all_threads);
 480                spin_lock_init(&pool->sp_lock);
 481        }
 482
 483        if (svc_uses_rpcbind(serv) && (!serv->sv_shutdown))
 484                serv->sv_shutdown = svc_rpcb_cleanup;
 485
 486        return serv;
 487}
 488
 489struct svc_serv *
 490svc_create(struct svc_program *prog, unsigned int bufsize,
 491           void (*shutdown)(struct svc_serv *serv, struct net *net))
 492{
 493        return __svc_create(prog, bufsize, /*npools*/1, shutdown);
 494}
 495EXPORT_SYMBOL_GPL(svc_create);
 496
 497struct svc_serv *
 498svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 499                  void (*shutdown)(struct svc_serv *serv, struct net *net),
 500                  svc_thread_fn func, struct module *mod)
 501{
 502        struct svc_serv *serv;
 503        unsigned int npools = svc_pool_map_get();
 504
 505        serv = __svc_create(prog, bufsize, npools, shutdown);
 506
 507        if (serv != NULL) {
 508                serv->sv_function = func;
 509                serv->sv_module = mod;
 510        }
 511
 512        return serv;
 513}
 514EXPORT_SYMBOL_GPL(svc_create_pooled);
 515
 516void svc_shutdown_net(struct svc_serv *serv, struct net *net)
 517{
 518        svc_close_net(serv, net);
 519
 520        if (serv->sv_shutdown)
 521                serv->sv_shutdown(serv, net);
 522}
 523EXPORT_SYMBOL_GPL(svc_shutdown_net);
 524
 525/*
 526 * Destroy an RPC service. Should be called with appropriate locking to
 527 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 528 */
 529void
 530svc_destroy(struct svc_serv *serv)
 531{
 532        dprintk("svc: svc_destroy(%s, %d)\n",
 533                                serv->sv_program->pg_name,
 534                                serv->sv_nrthreads);
 535
 536        if (serv->sv_nrthreads) {
 537                if (--(serv->sv_nrthreads) != 0) {
 538                        svc_sock_update_bufs(serv);
 539                        return;
 540                }
 541        } else
 542                printk("svc_destroy: no threads for serv=%p!\n", serv);
 543
 544        del_timer_sync(&serv->sv_temptimer);
 545
 546        /*
 547         * The last user is gone and thus all sockets have to be destroyed to
 548         * the point. Check this.
 549         */
 550        BUG_ON(!list_empty(&serv->sv_permsocks));
 551        BUG_ON(!list_empty(&serv->sv_tempsocks));
 552
 553        cache_clean_deferred(serv);
 554
 555        if (svc_serv_is_pooled(serv))
 556                svc_pool_map_put();
 557
 558        kfree(serv->sv_pools);
 559        kfree(serv);
 560}
 561EXPORT_SYMBOL_GPL(svc_destroy);
 562
 563/*
 564 * Allocate an RPC server's buffer space.
 565 * We allocate pages and place them in rq_argpages.
 566 */
 567static int
 568svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
 569{
 570        unsigned int pages, arghi;
 571
 572        /* bc_xprt uses fore channel allocated buffers */
 573        if (svc_is_backchannel(rqstp))
 574                return 1;
 575
 576        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 577                                       * We assume one is at most one page
 578                                       */
 579        arghi = 0;
 580        WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
 581        if (pages > RPCSVC_MAXPAGES)
 582                pages = RPCSVC_MAXPAGES;
 583        while (pages) {
 584                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
 585                if (!p)
 586                        break;
 587                rqstp->rq_pages[arghi++] = p;
 588                pages--;
 589        }
 590        return pages == 0;
 591}
 592
 593/*
 594 * Release an RPC server buffer
 595 */
 596static void
 597svc_release_buffer(struct svc_rqst *rqstp)
 598{
 599        unsigned int i;
 600
 601        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 602                if (rqstp->rq_pages[i])
 603                        put_page(rqstp->rq_pages[i]);
 604}
 605
 606struct svc_rqst *
 607svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 608{
 609        struct svc_rqst *rqstp;
 610
 611        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
 612        if (!rqstp)
 613                goto out_enomem;
 614
 615        init_waitqueue_head(&rqstp->rq_wait);
 616
 617        serv->sv_nrthreads++;
 618        spin_lock_bh(&pool->sp_lock);
 619        pool->sp_nrthreads++;
 620        list_add(&rqstp->rq_all, &pool->sp_all_threads);
 621        spin_unlock_bh(&pool->sp_lock);
 622        rqstp->rq_server = serv;
 623        rqstp->rq_pool = pool;
 624
 625        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 626        if (!rqstp->rq_argp)
 627                goto out_thread;
 628
 629        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 630        if (!rqstp->rq_resp)
 631                goto out_thread;
 632
 633        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
 634                goto out_thread;
 635
 636        return rqstp;
 637out_thread:
 638        svc_exit_thread(rqstp);
 639out_enomem:
 640        return ERR_PTR(-ENOMEM);
 641}
 642EXPORT_SYMBOL_GPL(svc_prepare_thread);
 643
 644/*
 645 * Choose a pool in which to create a new thread, for svc_set_num_threads
 646 */
 647static inline struct svc_pool *
 648choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 649{
 650        if (pool != NULL)
 651                return pool;
 652
 653        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 654}
 655
 656/*
 657 * Choose a thread to kill, for svc_set_num_threads
 658 */
 659static inline struct task_struct *
 660choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 661{
 662        unsigned int i;
 663        struct task_struct *task = NULL;
 664
 665        if (pool != NULL) {
 666                spin_lock_bh(&pool->sp_lock);
 667        } else {
 668                /* choose a pool in round-robin fashion */
 669                for (i = 0; i < serv->sv_nrpools; i++) {
 670                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 671                        spin_lock_bh(&pool->sp_lock);
 672                        if (!list_empty(&pool->sp_all_threads))
 673                                goto found_pool;
 674                        spin_unlock_bh(&pool->sp_lock);
 675                }
 676                return NULL;
 677        }
 678
 679found_pool:
 680        if (!list_empty(&pool->sp_all_threads)) {
 681                struct svc_rqst *rqstp;
 682
 683                /*
 684                 * Remove from the pool->sp_all_threads list
 685                 * so we don't try to kill it again.
 686                 */
 687                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 688                list_del_init(&rqstp->rq_all);
 689                task = rqstp->rq_task;
 690        }
 691        spin_unlock_bh(&pool->sp_lock);
 692
 693        return task;
 694}
 695
 696/*
 697 * Create or destroy enough new threads to make the number
 698 * of threads the given number.  If `pool' is non-NULL, applies
 699 * only to threads in that pool, otherwise round-robins between
 700 * all pools.  Caller must ensure that mutual exclusion between this and
 701 * server startup or shutdown.
 702 *
 703 * Destroying threads relies on the service threads filling in
 704 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 705 * has been created using svc_create_pooled().
 706 *
 707 * Based on code that used to be in nfsd_svc() but tweaked
 708 * to be pool-aware.
 709 */
 710int
 711svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 712{
 713        struct svc_rqst *rqstp;
 714        struct task_struct *task;
 715        struct svc_pool *chosen_pool;
 716        int error = 0;
 717        unsigned int state = serv->sv_nrthreads-1;
 718        int node;
 719
 720        if (pool == NULL) {
 721                /* The -1 assumes caller has done a svc_get() */
 722                nrservs -= (serv->sv_nrthreads-1);
 723        } else {
 724                spin_lock_bh(&pool->sp_lock);
 725                nrservs -= pool->sp_nrthreads;
 726                spin_unlock_bh(&pool->sp_lock);
 727        }
 728
 729        /* create new threads */
 730        while (nrservs > 0) {
 731                nrservs--;
 732                chosen_pool = choose_pool(serv, pool, &state);
 733
 734                node = svc_pool_map_get_node(chosen_pool->sp_id);
 735                rqstp = svc_prepare_thread(serv, chosen_pool, node);
 736                if (IS_ERR(rqstp)) {
 737                        error = PTR_ERR(rqstp);
 738                        break;
 739                }
 740
 741                __module_get(serv->sv_module);
 742                task = kthread_create_on_node(serv->sv_function, rqstp,
 743                                              node, serv->sv_name);
 744                if (IS_ERR(task)) {
 745                        error = PTR_ERR(task);
 746                        module_put(serv->sv_module);
 747                        svc_exit_thread(rqstp);
 748                        break;
 749                }
 750
 751                rqstp->rq_task = task;
 752                if (serv->sv_nrpools > 1)
 753                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 754
 755                svc_sock_update_bufs(serv);
 756                wake_up_process(task);
 757        }
 758        /* destroy old threads */
 759        while (nrservs < 0 &&
 760               (task = choose_victim(serv, pool, &state)) != NULL) {
 761                send_sig(SIGINT, task, 1);
 762                nrservs++;
 763        }
 764
 765        return error;
 766}
 767EXPORT_SYMBOL_GPL(svc_set_num_threads);
 768
 769/*
 770 * Called from a server thread as it's exiting. Caller must hold the BKL or
 771 * the "service mutex", whichever is appropriate for the service.
 772 */
 773void
 774svc_exit_thread(struct svc_rqst *rqstp)
 775{
 776        struct svc_serv *serv = rqstp->rq_server;
 777        struct svc_pool *pool = rqstp->rq_pool;
 778
 779        svc_release_buffer(rqstp);
 780        kfree(rqstp->rq_resp);
 781        kfree(rqstp->rq_argp);
 782        kfree(rqstp->rq_auth_data);
 783
 784        spin_lock_bh(&pool->sp_lock);
 785        pool->sp_nrthreads--;
 786        list_del(&rqstp->rq_all);
 787        spin_unlock_bh(&pool->sp_lock);
 788
 789        kfree(rqstp);
 790
 791        /* Release the server */
 792        if (serv)
 793                svc_destroy(serv);
 794}
 795EXPORT_SYMBOL_GPL(svc_exit_thread);
 796
 797/*
 798 * Register an "inet" protocol family netid with the local
 799 * rpcbind daemon via an rpcbind v4 SET request.
 800 *
 801 * No netconfig infrastructure is available in the kernel, so
 802 * we map IP_ protocol numbers to netids by hand.
 803 *
 804 * Returns zero on success; a negative errno value is returned
 805 * if any error occurs.
 806 */
 807static int __svc_rpcb_register4(struct net *net, const u32 program,
 808                                const u32 version,
 809                                const unsigned short protocol,
 810                                const unsigned short port)
 811{
 812        const struct sockaddr_in sin = {
 813                .sin_family             = AF_INET,
 814                .sin_addr.s_addr        = htonl(INADDR_ANY),
 815                .sin_port               = htons(port),
 816        };
 817        const char *netid;
 818        int error;
 819
 820        switch (protocol) {
 821        case IPPROTO_UDP:
 822                netid = RPCBIND_NETID_UDP;
 823                break;
 824        case IPPROTO_TCP:
 825                netid = RPCBIND_NETID_TCP;
 826                break;
 827        default:
 828                return -ENOPROTOOPT;
 829        }
 830
 831        error = rpcb_v4_register(net, program, version,
 832                                        (const struct sockaddr *)&sin, netid);
 833
 834        /*
 835         * User space didn't support rpcbind v4, so retry this
 836         * registration request with the legacy rpcbind v2 protocol.
 837         */
 838        if (error == -EPROTONOSUPPORT)
 839                error = rpcb_register(net, program, version, protocol, port);
 840
 841        return error;
 842}
 843
 844#if IS_ENABLED(CONFIG_IPV6)
 845/*
 846 * Register an "inet6" protocol family netid with the local
 847 * rpcbind daemon via an rpcbind v4 SET request.
 848 *
 849 * No netconfig infrastructure is available in the kernel, so
 850 * we map IP_ protocol numbers to netids by hand.
 851 *
 852 * Returns zero on success; a negative errno value is returned
 853 * if any error occurs.
 854 */
 855static int __svc_rpcb_register6(struct net *net, const u32 program,
 856                                const u32 version,
 857                                const unsigned short protocol,
 858                                const unsigned short port)
 859{
 860        const struct sockaddr_in6 sin6 = {
 861                .sin6_family            = AF_INET6,
 862                .sin6_addr              = IN6ADDR_ANY_INIT,
 863                .sin6_port              = htons(port),
 864        };
 865        const char *netid;
 866        int error;
 867
 868        switch (protocol) {
 869        case IPPROTO_UDP:
 870                netid = RPCBIND_NETID_UDP6;
 871                break;
 872        case IPPROTO_TCP:
 873                netid = RPCBIND_NETID_TCP6;
 874                break;
 875        default:
 876                return -ENOPROTOOPT;
 877        }
 878
 879        error = rpcb_v4_register(net, program, version,
 880                                        (const struct sockaddr *)&sin6, netid);
 881
 882        /*
 883         * User space didn't support rpcbind version 4, so we won't
 884         * use a PF_INET6 listener.
 885         */
 886        if (error == -EPROTONOSUPPORT)
 887                error = -EAFNOSUPPORT;
 888
 889        return error;
 890}
 891#endif  /* IS_ENABLED(CONFIG_IPV6) */
 892
 893/*
 894 * Register a kernel RPC service via rpcbind version 4.
 895 *
 896 * Returns zero on success; a negative errno value is returned
 897 * if any error occurs.
 898 */
 899static int __svc_register(struct net *net, const char *progname,
 900                          const u32 program, const u32 version,
 901                          const int family,
 902                          const unsigned short protocol,
 903                          const unsigned short port)
 904{
 905        int error = -EAFNOSUPPORT;
 906
 907        switch (family) {
 908        case PF_INET:
 909                error = __svc_rpcb_register4(net, program, version,
 910                                                protocol, port);
 911                break;
 912#if IS_ENABLED(CONFIG_IPV6)
 913        case PF_INET6:
 914                error = __svc_rpcb_register6(net, program, version,
 915                                                protocol, port);
 916#endif
 917        }
 918
 919        if (error < 0)
 920                printk(KERN_WARNING "svc: failed to register %sv%u RPC "
 921                        "service (errno %d).\n", progname, version, -error);
 922        return error;
 923}
 924
 925/**
 926 * svc_register - register an RPC service with the local portmapper
 927 * @serv: svc_serv struct for the service to register
 928 * @net: net namespace for the service to register
 929 * @family: protocol family of service's listener socket
 930 * @proto: transport protocol number to advertise
 931 * @port: port to advertise
 932 *
 933 * Service is registered for any address in the passed-in protocol family
 934 */
 935int svc_register(const struct svc_serv *serv, struct net *net,
 936                 const int family, const unsigned short proto,
 937                 const unsigned short port)
 938{
 939        struct svc_program      *progp;
 940        unsigned int            i;
 941        int                     error = 0;
 942
 943        WARN_ON_ONCE(proto == 0 && port == 0);
 944        if (proto == 0 && port == 0)
 945                return -EINVAL;
 946
 947        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 948                for (i = 0; i < progp->pg_nvers; i++) {
 949                        if (progp->pg_vers[i] == NULL)
 950                                continue;
 951
 952                        dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
 953                                        progp->pg_name,
 954                                        i,
 955                                        proto == IPPROTO_UDP?  "udp" : "tcp",
 956                                        port,
 957                                        family,
 958                                        progp->pg_vers[i]->vs_hidden?
 959                                                " (but not telling portmap)" : "");
 960
 961                        if (progp->pg_vers[i]->vs_hidden)
 962                                continue;
 963
 964                        error = __svc_register(net, progp->pg_name, progp->pg_prog,
 965                                                i, family, proto, port);
 966                        if (error < 0)
 967                                break;
 968                }
 969        }
 970
 971        return error;
 972}
 973
 974/*
 975 * If user space is running rpcbind, it should take the v4 UNSET
 976 * and clear everything for this [program, version].  If user space
 977 * is running portmap, it will reject the v4 UNSET, but won't have
 978 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
 979 * in this case to clear all existing entries for [program, version].
 980 */
 981static void __svc_unregister(struct net *net, const u32 program, const u32 version,
 982                             const char *progname)
 983{
 984        int error;
 985
 986        error = rpcb_v4_register(net, program, version, NULL, "");
 987
 988        /*
 989         * User space didn't support rpcbind v4, so retry this
 990         * request with the legacy rpcbind v2 protocol.
 991         */
 992        if (error == -EPROTONOSUPPORT)
 993                error = rpcb_register(net, program, version, 0, 0);
 994
 995        dprintk("svc: %s(%sv%u), error %d\n",
 996                        __func__, progname, version, error);
 997}
 998
 999/*
1000 * All netids, bind addresses and ports registered for [program, version]
1001 * are removed from the local rpcbind database (if the service is not
1002 * hidden) to make way for a new instance of the service.
1003 *
1004 * The result of unregistration is reported via dprintk for those who want
1005 * verification of the result, but is otherwise not important.
1006 */
1007static void svc_unregister(const struct svc_serv *serv, struct net *net)
1008{
1009        struct svc_program *progp;
1010        unsigned long flags;
1011        unsigned int i;
1012
1013        clear_thread_flag(TIF_SIGPENDING);
1014
1015        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1016                for (i = 0; i < progp->pg_nvers; i++) {
1017                        if (progp->pg_vers[i] == NULL)
1018                                continue;
1019                        if (progp->pg_vers[i]->vs_hidden)
1020                                continue;
1021
1022                        dprintk("svc: attempting to unregister %sv%u\n",
1023                                progp->pg_name, i);
1024                        __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1025                }
1026        }
1027
1028        spin_lock_irqsave(&current->sighand->siglock, flags);
1029        recalc_sigpending();
1030        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1031}
1032
1033/*
1034 * dprintk the given error with the address of the client that caused it.
1035 */
1036static __printf(2, 3)
1037void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1038{
1039        struct va_format vaf;
1040        va_list args;
1041        char    buf[RPC_MAX_ADDRBUFLEN];
1042
1043        va_start(args, fmt);
1044
1045        vaf.fmt = fmt;
1046        vaf.va = &args;
1047
1048        dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1049
1050        va_end(args);
1051}
1052
1053/*
1054 * Common routine for processing the RPC request.
1055 */
1056static int
1057svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1058{
1059        struct svc_program      *progp;
1060        struct svc_version      *versp = NULL;  /* compiler food */
1061        struct svc_procedure    *procp = NULL;
1062        struct svc_serv         *serv = rqstp->rq_server;
1063        kxdrproc_t              xdr;
1064        __be32                  *statp;
1065        u32                     prog, vers, proc;
1066        __be32                  auth_stat, rpc_stat;
1067        int                     auth_res;
1068        __be32                  *reply_statp;
1069
1070        rpc_stat = rpc_success;
1071
1072        if (argv->iov_len < 6*4)
1073                goto err_short_len;
1074
1075        /* Will be turned off only in gss privacy case: */
1076        rqstp->rq_splice_ok = 1;
1077        /* Will be turned off only when NFSv4 Sessions are used */
1078        rqstp->rq_usedeferral = 1;
1079        rqstp->rq_dropme = false;
1080
1081        /* Setup reply header */
1082        rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1083
1084        svc_putu32(resv, rqstp->rq_xid);
1085
1086        vers = svc_getnl(argv);
1087
1088        /* First words of reply: */
1089        svc_putnl(resv, 1);             /* REPLY */
1090
1091        if (vers != 2)          /* RPC version number */
1092                goto err_bad_rpc;
1093
1094        /* Save position in case we later decide to reject: */
1095        reply_statp = resv->iov_base + resv->iov_len;
1096
1097        svc_putnl(resv, 0);             /* ACCEPT */
1098
1099        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1100        rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
1101        rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
1102
1103        progp = serv->sv_program;
1104
1105        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1106                if (prog == progp->pg_prog)
1107                        break;
1108
1109        /*
1110         * Decode auth data, and add verifier to reply buffer.
1111         * We do this before anything else in order to get a decent
1112         * auth verifier.
1113         */
1114        auth_res = svc_authenticate(rqstp, &auth_stat);
1115        /* Also give the program a chance to reject this call: */
1116        if (auth_res == SVC_OK && progp) {
1117                auth_stat = rpc_autherr_badcred;
1118                auth_res = progp->pg_authenticate(rqstp);
1119        }
1120        switch (auth_res) {
1121        case SVC_OK:
1122                break;
1123        case SVC_GARBAGE:
1124                goto err_garbage;
1125        case SVC_SYSERR:
1126                rpc_stat = rpc_system_err;
1127                goto err_bad;
1128        case SVC_DENIED:
1129                goto err_bad_auth;
1130        case SVC_CLOSE:
1131                if (test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1132                        svc_close_xprt(rqstp->rq_xprt);
1133        case SVC_DROP:
1134                goto dropit;
1135        case SVC_COMPLETE:
1136                goto sendit;
1137        }
1138
1139        if (progp == NULL)
1140                goto err_bad_prog;
1141
1142        if (vers >= progp->pg_nvers ||
1143          !(versp = progp->pg_vers[vers]))
1144                goto err_bad_vers;
1145
1146        procp = versp->vs_proc + proc;
1147        if (proc >= versp->vs_nproc || !procp->pc_func)
1148                goto err_bad_proc;
1149        rqstp->rq_procinfo = procp;
1150
1151        /* Syntactic check complete */
1152        serv->sv_stats->rpccnt++;
1153
1154        /* Build the reply header. */
1155        statp = resv->iov_base +resv->iov_len;
1156        svc_putnl(resv, RPC_SUCCESS);
1157
1158        /* Bump per-procedure stats counter */
1159        procp->pc_count++;
1160
1161        /* Initialize storage for argp and resp */
1162        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1163        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1164
1165        /* un-reserve some of the out-queue now that we have a
1166         * better idea of reply size
1167         */
1168        if (procp->pc_xdrressize)
1169                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1170
1171        /* Call the function that processes the request. */
1172        if (!versp->vs_dispatch) {
1173                /* Decode arguments */
1174                xdr = procp->pc_decode;
1175                if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
1176                        goto err_garbage;
1177
1178                *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
1179
1180                /* Encode reply */
1181                if (rqstp->rq_dropme) {
1182                        if (procp->pc_release)
1183                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1184                        goto dropit;
1185                }
1186                if (*statp == rpc_success &&
1187                    (xdr = procp->pc_encode) &&
1188                    !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1189                        dprintk("svc: failed to encode reply\n");
1190                        /* serv->sv_stats->rpcsystemerr++; */
1191                        *statp = rpc_system_err;
1192                }
1193        } else {
1194                dprintk("svc: calling dispatcher\n");
1195                if (!versp->vs_dispatch(rqstp, statp)) {
1196                        /* Release reply info */
1197                        if (procp->pc_release)
1198                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1199                        goto dropit;
1200                }
1201        }
1202
1203        /* Check RPC status result */
1204        if (*statp != rpc_success)
1205                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1206
1207        /* Release reply info */
1208        if (procp->pc_release)
1209                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1210
1211        if (procp->pc_encode == NULL)
1212                goto dropit;
1213
1214 sendit:
1215        if (svc_authorise(rqstp))
1216                goto dropit;
1217        return 1;               /* Caller can now send it */
1218
1219 dropit:
1220        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1221        dprintk("svc: svc_process dropit\n");
1222        return 0;
1223
1224err_short_len:
1225        svc_printk(rqstp, "short len %Zd, dropping request\n",
1226                        argv->iov_len);
1227
1228        goto dropit;                    /* drop request */
1229
1230err_bad_rpc:
1231        serv->sv_stats->rpcbadfmt++;
1232        svc_putnl(resv, 1);     /* REJECT */
1233        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1234        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1235        svc_putnl(resv, 2);
1236        goto sendit;
1237
1238err_bad_auth:
1239        dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1240        serv->sv_stats->rpcbadauth++;
1241        /* Restore write pointer to location of accept status: */
1242        xdr_ressize_check(rqstp, reply_statp);
1243        svc_putnl(resv, 1);     /* REJECT */
1244        svc_putnl(resv, 1);     /* AUTH_ERROR */
1245        svc_putnl(resv, ntohl(auth_stat));      /* status */
1246        goto sendit;
1247
1248err_bad_prog:
1249        dprintk("svc: unknown program %d\n", prog);
1250        serv->sv_stats->rpcbadfmt++;
1251        svc_putnl(resv, RPC_PROG_UNAVAIL);
1252        goto sendit;
1253
1254err_bad_vers:
1255        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1256                       vers, prog, progp->pg_name);
1257
1258        serv->sv_stats->rpcbadfmt++;
1259        svc_putnl(resv, RPC_PROG_MISMATCH);
1260        svc_putnl(resv, progp->pg_lovers);
1261        svc_putnl(resv, progp->pg_hivers);
1262        goto sendit;
1263
1264err_bad_proc:
1265        svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1266
1267        serv->sv_stats->rpcbadfmt++;
1268        svc_putnl(resv, RPC_PROC_UNAVAIL);
1269        goto sendit;
1270
1271err_garbage:
1272        svc_printk(rqstp, "failed to decode args\n");
1273
1274        rpc_stat = rpc_garbage_args;
1275err_bad:
1276        serv->sv_stats->rpcbadfmt++;
1277        svc_putnl(resv, ntohl(rpc_stat));
1278        goto sendit;
1279}
1280EXPORT_SYMBOL_GPL(svc_process);
1281
1282/*
1283 * Process the RPC request.
1284 */
1285int
1286svc_process(struct svc_rqst *rqstp)
1287{
1288        struct kvec             *argv = &rqstp->rq_arg.head[0];
1289        struct kvec             *resv = &rqstp->rq_res.head[0];
1290        struct svc_serv         *serv = rqstp->rq_server;
1291        u32                     dir;
1292
1293        /*
1294         * Setup response xdr_buf.
1295         * Initially it has just one page
1296         */
1297        rqstp->rq_next_page = &rqstp->rq_respages[1];
1298        resv->iov_base = page_address(rqstp->rq_respages[0]);
1299        resv->iov_len = 0;
1300        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1301        rqstp->rq_res.len = 0;
1302        rqstp->rq_res.page_base = 0;
1303        rqstp->rq_res.page_len = 0;
1304        rqstp->rq_res.buflen = PAGE_SIZE;
1305        rqstp->rq_res.tail[0].iov_base = NULL;
1306        rqstp->rq_res.tail[0].iov_len = 0;
1307
1308        rqstp->rq_xid = svc_getu32(argv);
1309
1310        dir  = svc_getnl(argv);
1311        if (dir != 0) {
1312                /* direction != CALL */
1313                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1314                serv->sv_stats->rpcbadfmt++;
1315                svc_drop(rqstp);
1316                return 0;
1317        }
1318
1319        /* Returns 1 for send, 0 for drop */
1320        if (svc_process_common(rqstp, argv, resv))
1321                return svc_send(rqstp);
1322        else {
1323                svc_drop(rqstp);
1324                return 0;
1325        }
1326}
1327
1328#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1329/*
1330 * Process a backchannel RPC request that arrived over an existing
1331 * outbound connection
1332 */
1333int
1334bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1335               struct svc_rqst *rqstp)
1336{
1337        struct kvec     *argv = &rqstp->rq_arg.head[0];
1338        struct kvec     *resv = &rqstp->rq_res.head[0];
1339
1340        /* Build the svc_rqst used by the common processing routine */
1341        rqstp->rq_xprt = serv->sv_bc_xprt;
1342        rqstp->rq_xid = req->rq_xid;
1343        rqstp->rq_prot = req->rq_xprt->prot;
1344        rqstp->rq_server = serv;
1345
1346        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1347        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1348        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1349        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1350
1351        /* reset result send buffer "put" position */
1352        resv->iov_len = 0;
1353
1354        if (rqstp->rq_prot != IPPROTO_TCP) {
1355                printk(KERN_ERR "No support for Non-TCP transports!\n");
1356                BUG();
1357        }
1358
1359        /*
1360         * Skip the next two words because they've already been
1361         * processed in the trasport
1362         */
1363        svc_getu32(argv);       /* XID */
1364        svc_getnl(argv);        /* CALLDIR */
1365
1366        /* Returns 1 for send, 0 for drop */
1367        if (svc_process_common(rqstp, argv, resv)) {
1368                memcpy(&req->rq_snd_buf, &rqstp->rq_res,
1369                                                sizeof(req->rq_snd_buf));
1370                return bc_send(req);
1371        } else {
1372                /* drop request */
1373                xprt_free_bc_request(req);
1374                return 0;
1375        }
1376}
1377EXPORT_SYMBOL_GPL(bc_svc_process);
1378#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1379
1380/*
1381 * Return (transport-specific) limit on the rpc payload.
1382 */
1383u32 svc_max_payload(const struct svc_rqst *rqstp)
1384{
1385        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1386
1387        if (rqstp->rq_server->sv_max_payload < max)
1388                max = rqstp->rq_server->sv_max_payload;
1389        return max;
1390}
1391EXPORT_SYMBOL_GPL(svc_max_payload);
1392