linux/net/sunrpc/svc.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/svc.c
   3 *
   4 * High-level RPC service routines
   5 *
   6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   7 *
   8 * Multiple threads pools and NUMAisation
   9 * Copyright (c) 2006 Silicon Graphics, Inc.
  10 * by Greg Banks <gnb@melbourne.sgi.com>
  11 */
  12
  13#include <linux/linkage.h>
  14#include <linux/sched.h>
  15#include <linux/errno.h>
  16#include <linux/net.h>
  17#include <linux/in.h>
  18#include <linux/mm.h>
  19#include <linux/interrupt.h>
  20#include <linux/module.h>
  21#include <linux/kthread.h>
  22#include <linux/slab.h>
  23#include <linux/nsproxy.h>
  24
  25#include <linux/sunrpc/types.h>
  26#include <linux/sunrpc/xdr.h>
  27#include <linux/sunrpc/stats.h>
  28#include <linux/sunrpc/svcsock.h>
  29#include <linux/sunrpc/clnt.h>
  30#include <linux/sunrpc/bc_xprt.h>
  31
  32#define RPCDBG_FACILITY RPCDBG_SVCDSP
  33
  34static void svc_unregister(const struct svc_serv *serv, struct net *net);
  35
  36#define svc_serv_is_pooled(serv)    ((serv)->sv_function)
  37
  38/*
  39 * Mode for mapping cpus to pools.
  40 */
  41enum {
  42        SVC_POOL_AUTO = -1,     /* choose one of the others */
  43        SVC_POOL_GLOBAL,        /* no mapping, just a single global pool
  44                                 * (legacy & UP mode) */
  45        SVC_POOL_PERCPU,        /* one pool per cpu */
  46        SVC_POOL_PERNODE        /* one pool per numa node */
  47};
  48#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  49
  50/*
  51 * Structure for mapping cpus to pools and vice versa.
  52 * Setup once during sunrpc initialisation.
  53 */
  54static struct svc_pool_map {
  55        int count;                      /* How many svc_servs use us */
  56        int mode;                       /* Note: int not enum to avoid
  57                                         * warnings about "enumeration value
  58                                         * not handled in switch" */
  59        unsigned int npools;
  60        unsigned int *pool_to;          /* maps pool id to cpu or node */
  61        unsigned int *to_pool;          /* maps cpu or node to pool id */
  62} svc_pool_map = {
  63        .count = 0,
  64        .mode = SVC_POOL_DEFAULT
  65};
  66static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  67
  68static int
  69param_set_pool_mode(const char *val, struct kernel_param *kp)
  70{
  71        int *ip = (int *)kp->arg;
  72        struct svc_pool_map *m = &svc_pool_map;
  73        int err;
  74
  75        mutex_lock(&svc_pool_map_mutex);
  76
  77        err = -EBUSY;
  78        if (m->count)
  79                goto out;
  80
  81        err = 0;
  82        if (!strncmp(val, "auto", 4))
  83                *ip = SVC_POOL_AUTO;
  84        else if (!strncmp(val, "global", 6))
  85                *ip = SVC_POOL_GLOBAL;
  86        else if (!strncmp(val, "percpu", 6))
  87                *ip = SVC_POOL_PERCPU;
  88        else if (!strncmp(val, "pernode", 7))
  89                *ip = SVC_POOL_PERNODE;
  90        else
  91                err = -EINVAL;
  92
  93out:
  94        mutex_unlock(&svc_pool_map_mutex);
  95        return err;
  96}
  97
  98static int
  99param_get_pool_mode(char *buf, struct kernel_param *kp)
 100{
 101        int *ip = (int *)kp->arg;
 102
 103        switch (*ip)
 104        {
 105        case SVC_POOL_AUTO:
 106                return strlcpy(buf, "auto", 20);
 107        case SVC_POOL_GLOBAL:
 108                return strlcpy(buf, "global", 20);
 109        case SVC_POOL_PERCPU:
 110                return strlcpy(buf, "percpu", 20);
 111        case SVC_POOL_PERNODE:
 112                return strlcpy(buf, "pernode", 20);
 113        default:
 114                return sprintf(buf, "%d", *ip);
 115        }
 116}
 117
 118module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 119                 &svc_pool_map.mode, 0644);
 120
 121/*
 122 * Detect best pool mapping mode heuristically,
 123 * according to the machine's topology.
 124 */
 125static int
 126svc_pool_map_choose_mode(void)
 127{
 128        unsigned int node;
 129
 130        if (nr_online_nodes > 1) {
 131                /*
 132                 * Actually have multiple NUMA nodes,
 133                 * so split pools on NUMA node boundaries
 134                 */
 135                return SVC_POOL_PERNODE;
 136        }
 137
 138        node = first_online_node;
 139        if (nr_cpus_node(node) > 2) {
 140                /*
 141                 * Non-trivial SMP, or CONFIG_NUMA on
 142                 * non-NUMA hardware, e.g. with a generic
 143                 * x86_64 kernel on Xeons.  In this case we
 144                 * want to divide the pools on cpu boundaries.
 145                 */
 146                return SVC_POOL_PERCPU;
 147        }
 148
 149        /* default: one global pool */
 150        return SVC_POOL_GLOBAL;
 151}
 152
 153/*
 154 * Allocate the to_pool[] and pool_to[] arrays.
 155 * Returns 0 on success or an errno.
 156 */
 157static int
 158svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 159{
 160        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 161        if (!m->to_pool)
 162                goto fail;
 163        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 164        if (!m->pool_to)
 165                goto fail_free;
 166
 167        return 0;
 168
 169fail_free:
 170        kfree(m->to_pool);
 171        m->to_pool = NULL;
 172fail:
 173        return -ENOMEM;
 174}
 175
 176/*
 177 * Initialise the pool map for SVC_POOL_PERCPU mode.
 178 * Returns number of pools or <0 on error.
 179 */
 180static int
 181svc_pool_map_init_percpu(struct svc_pool_map *m)
 182{
 183        unsigned int maxpools = nr_cpu_ids;
 184        unsigned int pidx = 0;
 185        unsigned int cpu;
 186        int err;
 187
 188        err = svc_pool_map_alloc_arrays(m, maxpools);
 189        if (err)
 190                return err;
 191
 192        for_each_online_cpu(cpu) {
 193                BUG_ON(pidx > maxpools);
 194                m->to_pool[cpu] = pidx;
 195                m->pool_to[pidx] = cpu;
 196                pidx++;
 197        }
 198        /* cpus brought online later all get mapped to pool0, sorry */
 199
 200        return pidx;
 201};
 202
 203
 204/*
 205 * Initialise the pool map for SVC_POOL_PERNODE mode.
 206 * Returns number of pools or <0 on error.
 207 */
 208static int
 209svc_pool_map_init_pernode(struct svc_pool_map *m)
 210{
 211        unsigned int maxpools = nr_node_ids;
 212        unsigned int pidx = 0;
 213        unsigned int node;
 214        int err;
 215
 216        err = svc_pool_map_alloc_arrays(m, maxpools);
 217        if (err)
 218                return err;
 219
 220        for_each_node_with_cpus(node) {
 221                /* some architectures (e.g. SN2) have cpuless nodes */
 222                BUG_ON(pidx > maxpools);
 223                m->to_pool[node] = pidx;
 224                m->pool_to[pidx] = node;
 225                pidx++;
 226        }
 227        /* nodes brought online later all get mapped to pool0, sorry */
 228
 229        return pidx;
 230}
 231
 232
 233/*
 234 * Add a reference to the global map of cpus to pools (and
 235 * vice versa).  Initialise the map if we're the first user.
 236 * Returns the number of pools.
 237 */
 238static unsigned int
 239svc_pool_map_get(void)
 240{
 241        struct svc_pool_map *m = &svc_pool_map;
 242        int npools = -1;
 243
 244        mutex_lock(&svc_pool_map_mutex);
 245
 246        if (m->count++) {
 247                mutex_unlock(&svc_pool_map_mutex);
 248                return m->npools;
 249        }
 250
 251        if (m->mode == SVC_POOL_AUTO)
 252                m->mode = svc_pool_map_choose_mode();
 253
 254        switch (m->mode) {
 255        case SVC_POOL_PERCPU:
 256                npools = svc_pool_map_init_percpu(m);
 257                break;
 258        case SVC_POOL_PERNODE:
 259                npools = svc_pool_map_init_pernode(m);
 260                break;
 261        }
 262
 263        if (npools < 0) {
 264                /* default, or memory allocation failure */
 265                npools = 1;
 266                m->mode = SVC_POOL_GLOBAL;
 267        }
 268        m->npools = npools;
 269
 270        mutex_unlock(&svc_pool_map_mutex);
 271        return m->npools;
 272}
 273
 274
 275/*
 276 * Drop a reference to the global map of cpus to pools.
 277 * When the last reference is dropped, the map data is
 278 * freed; this allows the sysadmin to change the pool
 279 * mode using the pool_mode module option without
 280 * rebooting or re-loading sunrpc.ko.
 281 */
 282static void
 283svc_pool_map_put(void)
 284{
 285        struct svc_pool_map *m = &svc_pool_map;
 286
 287        mutex_lock(&svc_pool_map_mutex);
 288
 289        if (!--m->count) {
 290                kfree(m->to_pool);
 291                m->to_pool = NULL;
 292                kfree(m->pool_to);
 293                m->pool_to = NULL;
 294                m->npools = 0;
 295        }
 296
 297        mutex_unlock(&svc_pool_map_mutex);
 298}
 299
 300
 301static int svc_pool_map_get_node(unsigned int pidx)
 302{
 303        const struct svc_pool_map *m = &svc_pool_map;
 304
 305        if (m->count) {
 306                if (m->mode == SVC_POOL_PERCPU)
 307                        return cpu_to_node(m->pool_to[pidx]);
 308                if (m->mode == SVC_POOL_PERNODE)
 309                        return m->pool_to[pidx];
 310        }
 311        return NUMA_NO_NODE;
 312}
 313/*
 314 * Set the given thread's cpus_allowed mask so that it
 315 * will only run on cpus in the given pool.
 316 */
 317static inline void
 318svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 319{
 320        struct svc_pool_map *m = &svc_pool_map;
 321        unsigned int node = m->pool_to[pidx];
 322
 323        /*
 324         * The caller checks for sv_nrpools > 1, which
 325         * implies that we've been initialized.
 326         */
 327        BUG_ON(m->count == 0);
 328
 329        switch (m->mode) {
 330        case SVC_POOL_PERCPU:
 331        {
 332                set_cpus_allowed_ptr(task, cpumask_of(node));
 333                break;
 334        }
 335        case SVC_POOL_PERNODE:
 336        {
 337                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 338                break;
 339        }
 340        }
 341}
 342
 343/*
 344 * Use the mapping mode to choose a pool for a given CPU.
 345 * Used when enqueueing an incoming RPC.  Always returns
 346 * a non-NULL pool pointer.
 347 */
 348struct svc_pool *
 349svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 350{
 351        struct svc_pool_map *m = &svc_pool_map;
 352        unsigned int pidx = 0;
 353
 354        /*
 355         * An uninitialised map happens in a pure client when
 356         * lockd is brought up, so silently treat it the
 357         * same as SVC_POOL_GLOBAL.
 358         */
 359        if (svc_serv_is_pooled(serv)) {
 360                switch (m->mode) {
 361                case SVC_POOL_PERCPU:
 362                        pidx = m->to_pool[cpu];
 363                        break;
 364                case SVC_POOL_PERNODE:
 365                        pidx = m->to_pool[cpu_to_node(cpu)];
 366                        break;
 367                }
 368        }
 369        return &serv->sv_pools[pidx % serv->sv_nrpools];
 370}
 371
 372int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
 373{
 374        int err;
 375
 376        err = rpcb_create_local(net);
 377        if (err)
 378                return err;
 379
 380        /* Remove any stale portmap registrations */
 381        svc_unregister(serv, net);
 382        return 0;
 383}
 384EXPORT_SYMBOL_GPL(svc_rpcb_setup);
 385
 386void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
 387{
 388        svc_unregister(serv, net);
 389        rpcb_put_local(net);
 390}
 391EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
 392
 393static int svc_uses_rpcbind(struct svc_serv *serv)
 394{
 395        struct svc_program      *progp;
 396        unsigned int            i;
 397
 398        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 399                for (i = 0; i < progp->pg_nvers; i++) {
 400                        if (progp->pg_vers[i] == NULL)
 401                                continue;
 402                        if (progp->pg_vers[i]->vs_hidden == 0)
 403                                return 1;
 404                }
 405        }
 406
 407        return 0;
 408}
 409
 410int svc_bind(struct svc_serv *serv, struct net *net)
 411{
 412        if (!svc_uses_rpcbind(serv))
 413                return 0;
 414        return svc_rpcb_setup(serv, net);
 415}
 416EXPORT_SYMBOL_GPL(svc_bind);
 417
 418/*
 419 * Create an RPC service
 420 */
 421static struct svc_serv *
 422__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 423             void (*shutdown)(struct svc_serv *serv, struct net *net))
 424{
 425        struct svc_serv *serv;
 426        unsigned int vers;
 427        unsigned int xdrsize;
 428        unsigned int i;
 429
 430        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 431                return NULL;
 432        serv->sv_name      = prog->pg_name;
 433        serv->sv_program   = prog;
 434        serv->sv_nrthreads = 1;
 435        serv->sv_stats     = prog->pg_stats;
 436        if (bufsize > RPCSVC_MAXPAYLOAD)
 437                bufsize = RPCSVC_MAXPAYLOAD;
 438        serv->sv_max_payload = bufsize? bufsize : 4096;
 439        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 440        serv->sv_shutdown  = shutdown;
 441        xdrsize = 0;
 442        while (prog) {
 443                prog->pg_lovers = prog->pg_nvers-1;
 444                for (vers=0; vers<prog->pg_nvers ; vers++)
 445                        if (prog->pg_vers[vers]) {
 446                                prog->pg_hivers = vers;
 447                                if (prog->pg_lovers > vers)
 448                                        prog->pg_lovers = vers;
 449                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 450                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 451                        }
 452                prog = prog->pg_next;
 453        }
 454        serv->sv_xdrsize   = xdrsize;
 455        INIT_LIST_HEAD(&serv->sv_tempsocks);
 456        INIT_LIST_HEAD(&serv->sv_permsocks);
 457        init_timer(&serv->sv_temptimer);
 458        spin_lock_init(&serv->sv_lock);
 459
 460        serv->sv_nrpools = npools;
 461        serv->sv_pools =
 462                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 463                        GFP_KERNEL);
 464        if (!serv->sv_pools) {
 465                kfree(serv);
 466                return NULL;
 467        }
 468
 469        for (i = 0; i < serv->sv_nrpools; i++) {
 470                struct svc_pool *pool = &serv->sv_pools[i];
 471
 472                dprintk("svc: initialising pool %u for %s\n",
 473                                i, serv->sv_name);
 474
 475                pool->sp_id = i;
 476                INIT_LIST_HEAD(&pool->sp_threads);
 477                INIT_LIST_HEAD(&pool->sp_sockets);
 478                INIT_LIST_HEAD(&pool->sp_all_threads);
 479                spin_lock_init(&pool->sp_lock);
 480        }
 481
 482        if (svc_uses_rpcbind(serv) && (!serv->sv_shutdown))
 483                serv->sv_shutdown = svc_rpcb_cleanup;
 484
 485        return serv;
 486}
 487
 488struct svc_serv *
 489svc_create(struct svc_program *prog, unsigned int bufsize,
 490           void (*shutdown)(struct svc_serv *serv, struct net *net))
 491{
 492        return __svc_create(prog, bufsize, /*npools*/1, shutdown);
 493}
 494EXPORT_SYMBOL_GPL(svc_create);
 495
 496struct svc_serv *
 497svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 498                  void (*shutdown)(struct svc_serv *serv, struct net *net),
 499                  svc_thread_fn func, struct module *mod)
 500{
 501        struct svc_serv *serv;
 502        unsigned int npools = svc_pool_map_get();
 503
 504        serv = __svc_create(prog, bufsize, npools, shutdown);
 505
 506        if (serv != NULL) {
 507                serv->sv_function = func;
 508                serv->sv_module = mod;
 509        }
 510
 511        return serv;
 512}
 513EXPORT_SYMBOL_GPL(svc_create_pooled);
 514
 515void svc_shutdown_net(struct svc_serv *serv, struct net *net)
 516{
 517        /*
 518         * The set of xprts (contained in the sv_tempsocks and
 519         * sv_permsocks lists) is now constant, since it is modified
 520         * only by accepting new sockets (done by service threads in
 521         * svc_recv) or aging old ones (done by sv_temptimer), or
 522         * configuration changes (excluded by whatever locking the
 523         * caller is using--nfsd_mutex in the case of nfsd).  So it's
 524         * safe to traverse those lists and shut everything down:
 525         */
 526        svc_close_net(serv, net);
 527
 528        if (serv->sv_shutdown)
 529                serv->sv_shutdown(serv, net);
 530}
 531EXPORT_SYMBOL_GPL(svc_shutdown_net);
 532
 533/*
 534 * Destroy an RPC service. Should be called with appropriate locking to
 535 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 536 */
 537void
 538svc_destroy(struct svc_serv *serv)
 539{
 540        dprintk("svc: svc_destroy(%s, %d)\n",
 541                                serv->sv_program->pg_name,
 542                                serv->sv_nrthreads);
 543
 544        if (serv->sv_nrthreads) {
 545                if (--(serv->sv_nrthreads) != 0) {
 546                        svc_sock_update_bufs(serv);
 547                        return;
 548                }
 549        } else
 550                printk("svc_destroy: no threads for serv=%p!\n", serv);
 551
 552        del_timer_sync(&serv->sv_temptimer);
 553
 554        /*
 555         * The last user is gone and thus all sockets have to be destroyed to
 556         * the point. Check this.
 557         */
 558        BUG_ON(!list_empty(&serv->sv_permsocks));
 559        BUG_ON(!list_empty(&serv->sv_tempsocks));
 560
 561        cache_clean_deferred(serv);
 562
 563        if (svc_serv_is_pooled(serv))
 564                svc_pool_map_put();
 565
 566        kfree(serv->sv_pools);
 567        kfree(serv);
 568}
 569EXPORT_SYMBOL_GPL(svc_destroy);
 570
 571/*
 572 * Allocate an RPC server's buffer space.
 573 * We allocate pages and place them in rq_argpages.
 574 */
 575static int
 576svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
 577{
 578        unsigned int pages, arghi;
 579
 580        /* bc_xprt uses fore channel allocated buffers */
 581        if (svc_is_backchannel(rqstp))
 582                return 1;
 583
 584        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 585                                       * We assume one is at most one page
 586                                       */
 587        arghi = 0;
 588        BUG_ON(pages > RPCSVC_MAXPAGES);
 589        while (pages) {
 590                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
 591                if (!p)
 592                        break;
 593                rqstp->rq_pages[arghi++] = p;
 594                pages--;
 595        }
 596        return pages == 0;
 597}
 598
 599/*
 600 * Release an RPC server buffer
 601 */
 602static void
 603svc_release_buffer(struct svc_rqst *rqstp)
 604{
 605        unsigned int i;
 606
 607        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 608                if (rqstp->rq_pages[i])
 609                        put_page(rqstp->rq_pages[i]);
 610}
 611
 612struct svc_rqst *
 613svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 614{
 615        struct svc_rqst *rqstp;
 616
 617        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
 618        if (!rqstp)
 619                goto out_enomem;
 620
 621        init_waitqueue_head(&rqstp->rq_wait);
 622
 623        serv->sv_nrthreads++;
 624        spin_lock_bh(&pool->sp_lock);
 625        pool->sp_nrthreads++;
 626        list_add(&rqstp->rq_all, &pool->sp_all_threads);
 627        spin_unlock_bh(&pool->sp_lock);
 628        rqstp->rq_server = serv;
 629        rqstp->rq_pool = pool;
 630
 631        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 632        if (!rqstp->rq_argp)
 633                goto out_thread;
 634
 635        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 636        if (!rqstp->rq_resp)
 637                goto out_thread;
 638
 639        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
 640                goto out_thread;
 641
 642        return rqstp;
 643out_thread:
 644        svc_exit_thread(rqstp);
 645out_enomem:
 646        return ERR_PTR(-ENOMEM);
 647}
 648EXPORT_SYMBOL_GPL(svc_prepare_thread);
 649
 650/*
 651 * Choose a pool in which to create a new thread, for svc_set_num_threads
 652 */
 653static inline struct svc_pool *
 654choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 655{
 656        if (pool != NULL)
 657                return pool;
 658
 659        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 660}
 661
 662/*
 663 * Choose a thread to kill, for svc_set_num_threads
 664 */
 665static inline struct task_struct *
 666choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 667{
 668        unsigned int i;
 669        struct task_struct *task = NULL;
 670
 671        if (pool != NULL) {
 672                spin_lock_bh(&pool->sp_lock);
 673        } else {
 674                /* choose a pool in round-robin fashion */
 675                for (i = 0; i < serv->sv_nrpools; i++) {
 676                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 677                        spin_lock_bh(&pool->sp_lock);
 678                        if (!list_empty(&pool->sp_all_threads))
 679                                goto found_pool;
 680                        spin_unlock_bh(&pool->sp_lock);
 681                }
 682                return NULL;
 683        }
 684
 685found_pool:
 686        if (!list_empty(&pool->sp_all_threads)) {
 687                struct svc_rqst *rqstp;
 688
 689                /*
 690                 * Remove from the pool->sp_all_threads list
 691                 * so we don't try to kill it again.
 692                 */
 693                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 694                list_del_init(&rqstp->rq_all);
 695                task = rqstp->rq_task;
 696        }
 697        spin_unlock_bh(&pool->sp_lock);
 698
 699        return task;
 700}
 701
 702/*
 703 * Create or destroy enough new threads to make the number
 704 * of threads the given number.  If `pool' is non-NULL, applies
 705 * only to threads in that pool, otherwise round-robins between
 706 * all pools.  Caller must ensure that mutual exclusion between this and
 707 * server startup or shutdown.
 708 *
 709 * Destroying threads relies on the service threads filling in
 710 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 711 * has been created using svc_create_pooled().
 712 *
 713 * Based on code that used to be in nfsd_svc() but tweaked
 714 * to be pool-aware.
 715 */
 716int
 717svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 718{
 719        struct svc_rqst *rqstp;
 720        struct task_struct *task;
 721        struct svc_pool *chosen_pool;
 722        int error = 0;
 723        unsigned int state = serv->sv_nrthreads-1;
 724        int node;
 725
 726        if (pool == NULL) {
 727                /* The -1 assumes caller has done a svc_get() */
 728                nrservs -= (serv->sv_nrthreads-1);
 729        } else {
 730                spin_lock_bh(&pool->sp_lock);
 731                nrservs -= pool->sp_nrthreads;
 732                spin_unlock_bh(&pool->sp_lock);
 733        }
 734
 735        /* create new threads */
 736        while (nrservs > 0) {
 737                nrservs--;
 738                chosen_pool = choose_pool(serv, pool, &state);
 739
 740                node = svc_pool_map_get_node(chosen_pool->sp_id);
 741                rqstp = svc_prepare_thread(serv, chosen_pool, node);
 742                if (IS_ERR(rqstp)) {
 743                        error = PTR_ERR(rqstp);
 744                        break;
 745                }
 746
 747                __module_get(serv->sv_module);
 748                task = kthread_create_on_node(serv->sv_function, rqstp,
 749                                              node, serv->sv_name);
 750                if (IS_ERR(task)) {
 751                        error = PTR_ERR(task);
 752                        module_put(serv->sv_module);
 753                        svc_exit_thread(rqstp);
 754                        break;
 755                }
 756
 757                rqstp->rq_task = task;
 758                if (serv->sv_nrpools > 1)
 759                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 760
 761                svc_sock_update_bufs(serv);
 762                wake_up_process(task);
 763        }
 764        /* destroy old threads */
 765        while (nrservs < 0 &&
 766               (task = choose_victim(serv, pool, &state)) != NULL) {
 767                send_sig(SIGINT, task, 1);
 768                nrservs++;
 769        }
 770
 771        return error;
 772}
 773EXPORT_SYMBOL_GPL(svc_set_num_threads);
 774
 775/*
 776 * Called from a server thread as it's exiting. Caller must hold the BKL or
 777 * the "service mutex", whichever is appropriate for the service.
 778 */
 779void
 780svc_exit_thread(struct svc_rqst *rqstp)
 781{
 782        struct svc_serv *serv = rqstp->rq_server;
 783        struct svc_pool *pool = rqstp->rq_pool;
 784
 785        svc_release_buffer(rqstp);
 786        kfree(rqstp->rq_resp);
 787        kfree(rqstp->rq_argp);
 788        kfree(rqstp->rq_auth_data);
 789
 790        spin_lock_bh(&pool->sp_lock);
 791        pool->sp_nrthreads--;
 792        list_del(&rqstp->rq_all);
 793        spin_unlock_bh(&pool->sp_lock);
 794
 795        kfree(rqstp);
 796
 797        /* Release the server */
 798        if (serv)
 799                svc_destroy(serv);
 800}
 801EXPORT_SYMBOL_GPL(svc_exit_thread);
 802
 803/*
 804 * Register an "inet" protocol family netid with the local
 805 * rpcbind daemon via an rpcbind v4 SET request.
 806 *
 807 * No netconfig infrastructure is available in the kernel, so
 808 * we map IP_ protocol numbers to netids by hand.
 809 *
 810 * Returns zero on success; a negative errno value is returned
 811 * if any error occurs.
 812 */
 813static int __svc_rpcb_register4(struct net *net, const u32 program,
 814                                const u32 version,
 815                                const unsigned short protocol,
 816                                const unsigned short port)
 817{
 818        const struct sockaddr_in sin = {
 819                .sin_family             = AF_INET,
 820                .sin_addr.s_addr        = htonl(INADDR_ANY),
 821                .sin_port               = htons(port),
 822        };
 823        const char *netid;
 824        int error;
 825
 826        switch (protocol) {
 827        case IPPROTO_UDP:
 828                netid = RPCBIND_NETID_UDP;
 829                break;
 830        case IPPROTO_TCP:
 831                netid = RPCBIND_NETID_TCP;
 832                break;
 833        default:
 834                return -ENOPROTOOPT;
 835        }
 836
 837        error = rpcb_v4_register(net, program, version,
 838                                        (const struct sockaddr *)&sin, netid);
 839
 840        /*
 841         * User space didn't support rpcbind v4, so retry this
 842         * registration request with the legacy rpcbind v2 protocol.
 843         */
 844        if (error == -EPROTONOSUPPORT)
 845                error = rpcb_register(net, program, version, protocol, port);
 846
 847        return error;
 848}
 849
 850#if IS_ENABLED(CONFIG_IPV6)
 851/*
 852 * Register an "inet6" protocol family netid with the local
 853 * rpcbind daemon via an rpcbind v4 SET request.
 854 *
 855 * No netconfig infrastructure is available in the kernel, so
 856 * we map IP_ protocol numbers to netids by hand.
 857 *
 858 * Returns zero on success; a negative errno value is returned
 859 * if any error occurs.
 860 */
 861static int __svc_rpcb_register6(struct net *net, const u32 program,
 862                                const u32 version,
 863                                const unsigned short protocol,
 864                                const unsigned short port)
 865{
 866        const struct sockaddr_in6 sin6 = {
 867                .sin6_family            = AF_INET6,
 868                .sin6_addr              = IN6ADDR_ANY_INIT,
 869                .sin6_port              = htons(port),
 870        };
 871        const char *netid;
 872        int error;
 873
 874        switch (protocol) {
 875        case IPPROTO_UDP:
 876                netid = RPCBIND_NETID_UDP6;
 877                break;
 878        case IPPROTO_TCP:
 879                netid = RPCBIND_NETID_TCP6;
 880                break;
 881        default:
 882                return -ENOPROTOOPT;
 883        }
 884
 885        error = rpcb_v4_register(net, program, version,
 886                                        (const struct sockaddr *)&sin6, netid);
 887
 888        /*
 889         * User space didn't support rpcbind version 4, so we won't
 890         * use a PF_INET6 listener.
 891         */
 892        if (error == -EPROTONOSUPPORT)
 893                error = -EAFNOSUPPORT;
 894
 895        return error;
 896}
 897#endif  /* IS_ENABLED(CONFIG_IPV6) */
 898
 899/*
 900 * Register a kernel RPC service via rpcbind version 4.
 901 *
 902 * Returns zero on success; a negative errno value is returned
 903 * if any error occurs.
 904 */
 905static int __svc_register(struct net *net, const char *progname,
 906                          const u32 program, const u32 version,
 907                          const int family,
 908                          const unsigned short protocol,
 909                          const unsigned short port)
 910{
 911        int error = -EAFNOSUPPORT;
 912
 913        switch (family) {
 914        case PF_INET:
 915                error = __svc_rpcb_register4(net, program, version,
 916                                                protocol, port);
 917                break;
 918#if IS_ENABLED(CONFIG_IPV6)
 919        case PF_INET6:
 920                error = __svc_rpcb_register6(net, program, version,
 921                                                protocol, port);
 922#endif
 923        }
 924
 925        if (error < 0)
 926                printk(KERN_WARNING "svc: failed to register %sv%u RPC "
 927                        "service (errno %d).\n", progname, version, -error);
 928        return error;
 929}
 930
 931/**
 932 * svc_register - register an RPC service with the local portmapper
 933 * @serv: svc_serv struct for the service to register
 934 * @net: net namespace for the service to register
 935 * @family: protocol family of service's listener socket
 936 * @proto: transport protocol number to advertise
 937 * @port: port to advertise
 938 *
 939 * Service is registered for any address in the passed-in protocol family
 940 */
 941int svc_register(const struct svc_serv *serv, struct net *net,
 942                 const int family, const unsigned short proto,
 943                 const unsigned short port)
 944{
 945        struct svc_program      *progp;
 946        unsigned int            i;
 947        int                     error = 0;
 948
 949        BUG_ON(proto == 0 && port == 0);
 950
 951        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 952                for (i = 0; i < progp->pg_nvers; i++) {
 953                        if (progp->pg_vers[i] == NULL)
 954                                continue;
 955
 956                        dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
 957                                        progp->pg_name,
 958                                        i,
 959                                        proto == IPPROTO_UDP?  "udp" : "tcp",
 960                                        port,
 961                                        family,
 962                                        progp->pg_vers[i]->vs_hidden?
 963                                                " (but not telling portmap)" : "");
 964
 965                        if (progp->pg_vers[i]->vs_hidden)
 966                                continue;
 967
 968                        error = __svc_register(net, progp->pg_name, progp->pg_prog,
 969                                                i, family, proto, port);
 970                        if (error < 0)
 971                                break;
 972                }
 973        }
 974
 975        return error;
 976}
 977
 978/*
 979 * If user space is running rpcbind, it should take the v4 UNSET
 980 * and clear everything for this [program, version].  If user space
 981 * is running portmap, it will reject the v4 UNSET, but won't have
 982 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
 983 * in this case to clear all existing entries for [program, version].
 984 */
 985static void __svc_unregister(struct net *net, const u32 program, const u32 version,
 986                             const char *progname)
 987{
 988        int error;
 989
 990        error = rpcb_v4_register(net, program, version, NULL, "");
 991
 992        /*
 993         * User space didn't support rpcbind v4, so retry this
 994         * request with the legacy rpcbind v2 protocol.
 995         */
 996        if (error == -EPROTONOSUPPORT)
 997                error = rpcb_register(net, program, version, 0, 0);
 998
 999        dprintk("svc: %s(%sv%u), error %d\n",
1000                        __func__, progname, version, error);
1001}
1002
1003/*
1004 * All netids, bind addresses and ports registered for [program, version]
1005 * are removed from the local rpcbind database (if the service is not
1006 * hidden) to make way for a new instance of the service.
1007 *
1008 * The result of unregistration is reported via dprintk for those who want
1009 * verification of the result, but is otherwise not important.
1010 */
1011static void svc_unregister(const struct svc_serv *serv, struct net *net)
1012{
1013        struct svc_program *progp;
1014        unsigned long flags;
1015        unsigned int i;
1016
1017        clear_thread_flag(TIF_SIGPENDING);
1018
1019        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1020                for (i = 0; i < progp->pg_nvers; i++) {
1021                        if (progp->pg_vers[i] == NULL)
1022                                continue;
1023                        if (progp->pg_vers[i]->vs_hidden)
1024                                continue;
1025
1026                        dprintk("svc: attempting to unregister %sv%u\n",
1027                                progp->pg_name, i);
1028                        __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1029                }
1030        }
1031
1032        spin_lock_irqsave(&current->sighand->siglock, flags);
1033        recalc_sigpending();
1034        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1035}
1036
1037/*
1038 * Printk the given error with the address of the client that caused it.
1039 */
1040static __printf(2, 3)
1041void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1042{
1043        struct va_format vaf;
1044        va_list args;
1045        char    buf[RPC_MAX_ADDRBUFLEN];
1046
1047        va_start(args, fmt);
1048
1049        vaf.fmt = fmt;
1050        vaf.va = &args;
1051
1052        net_warn_ratelimited("svc: %s: %pV",
1053                             svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1054
1055        va_end(args);
1056}
1057
1058/*
1059 * Common routine for processing the RPC request.
1060 */
1061static int
1062svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1063{
1064        struct svc_program      *progp;
1065        struct svc_version      *versp = NULL;  /* compiler food */
1066        struct svc_procedure    *procp = NULL;
1067        struct svc_serv         *serv = rqstp->rq_server;
1068        kxdrproc_t              xdr;
1069        __be32                  *statp;
1070        u32                     prog, vers, proc;
1071        __be32                  auth_stat, rpc_stat;
1072        int                     auth_res;
1073        __be32                  *reply_statp;
1074
1075        rpc_stat = rpc_success;
1076
1077        if (argv->iov_len < 6*4)
1078                goto err_short_len;
1079
1080        /* Will be turned off only in gss privacy case: */
1081        rqstp->rq_splice_ok = 1;
1082        /* Will be turned off only when NFSv4 Sessions are used */
1083        rqstp->rq_usedeferral = 1;
1084        rqstp->rq_dropme = false;
1085
1086        /* Setup reply header */
1087        rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1088
1089        svc_putu32(resv, rqstp->rq_xid);
1090
1091        vers = svc_getnl(argv);
1092
1093        /* First words of reply: */
1094        svc_putnl(resv, 1);             /* REPLY */
1095
1096        if (vers != 2)          /* RPC version number */
1097                goto err_bad_rpc;
1098
1099        /* Save position in case we later decide to reject: */
1100        reply_statp = resv->iov_base + resv->iov_len;
1101
1102        svc_putnl(resv, 0);             /* ACCEPT */
1103
1104        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1105        rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
1106        rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
1107
1108        progp = serv->sv_program;
1109
1110        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1111                if (prog == progp->pg_prog)
1112                        break;
1113
1114        /*
1115         * Decode auth data, and add verifier to reply buffer.
1116         * We do this before anything else in order to get a decent
1117         * auth verifier.
1118         */
1119        auth_res = svc_authenticate(rqstp, &auth_stat);
1120        /* Also give the program a chance to reject this call: */
1121        if (auth_res == SVC_OK && progp) {
1122                auth_stat = rpc_autherr_badcred;
1123                auth_res = progp->pg_authenticate(rqstp);
1124        }
1125        switch (auth_res) {
1126        case SVC_OK:
1127                break;
1128        case SVC_GARBAGE:
1129                goto err_garbage;
1130        case SVC_SYSERR:
1131                rpc_stat = rpc_system_err;
1132                goto err_bad;
1133        case SVC_DENIED:
1134                goto err_bad_auth;
1135        case SVC_CLOSE:
1136                if (test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1137                        svc_close_xprt(rqstp->rq_xprt);
1138        case SVC_DROP:
1139                goto dropit;
1140        case SVC_COMPLETE:
1141                goto sendit;
1142        }
1143
1144        if (progp == NULL)
1145                goto err_bad_prog;
1146
1147        if (vers >= progp->pg_nvers ||
1148          !(versp = progp->pg_vers[vers]))
1149                goto err_bad_vers;
1150
1151        procp = versp->vs_proc + proc;
1152        if (proc >= versp->vs_nproc || !procp->pc_func)
1153                goto err_bad_proc;
1154        rqstp->rq_procinfo = procp;
1155
1156        /* Syntactic check complete */
1157        serv->sv_stats->rpccnt++;
1158
1159        /* Build the reply header. */
1160        statp = resv->iov_base +resv->iov_len;
1161        svc_putnl(resv, RPC_SUCCESS);
1162
1163        /* Bump per-procedure stats counter */
1164        procp->pc_count++;
1165
1166        /* Initialize storage for argp and resp */
1167        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1168        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1169
1170        /* un-reserve some of the out-queue now that we have a
1171         * better idea of reply size
1172         */
1173        if (procp->pc_xdrressize)
1174                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1175
1176        /* Call the function that processes the request. */
1177        if (!versp->vs_dispatch) {
1178                /* Decode arguments */
1179                xdr = procp->pc_decode;
1180                if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
1181                        goto err_garbage;
1182
1183                *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
1184
1185                /* Encode reply */
1186                if (rqstp->rq_dropme) {
1187                        if (procp->pc_release)
1188                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1189                        goto dropit;
1190                }
1191                if (*statp == rpc_success &&
1192                    (xdr = procp->pc_encode) &&
1193                    !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1194                        dprintk("svc: failed to encode reply\n");
1195                        /* serv->sv_stats->rpcsystemerr++; */
1196                        *statp = rpc_system_err;
1197                }
1198        } else {
1199                dprintk("svc: calling dispatcher\n");
1200                if (!versp->vs_dispatch(rqstp, statp)) {
1201                        /* Release reply info */
1202                        if (procp->pc_release)
1203                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1204                        goto dropit;
1205                }
1206        }
1207
1208        /* Check RPC status result */
1209        if (*statp != rpc_success)
1210                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1211
1212        /* Release reply info */
1213        if (procp->pc_release)
1214                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1215
1216        if (procp->pc_encode == NULL)
1217                goto dropit;
1218
1219 sendit:
1220        if (svc_authorise(rqstp))
1221                goto dropit;
1222        return 1;               /* Caller can now send it */
1223
1224 dropit:
1225        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1226        dprintk("svc: svc_process dropit\n");
1227        return 0;
1228
1229err_short_len:
1230        svc_printk(rqstp, "short len %Zd, dropping request\n",
1231                        argv->iov_len);
1232
1233        goto dropit;                    /* drop request */
1234
1235err_bad_rpc:
1236        serv->sv_stats->rpcbadfmt++;
1237        svc_putnl(resv, 1);     /* REJECT */
1238        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1239        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1240        svc_putnl(resv, 2);
1241        goto sendit;
1242
1243err_bad_auth:
1244        dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1245        serv->sv_stats->rpcbadauth++;
1246        /* Restore write pointer to location of accept status: */
1247        xdr_ressize_check(rqstp, reply_statp);
1248        svc_putnl(resv, 1);     /* REJECT */
1249        svc_putnl(resv, 1);     /* AUTH_ERROR */
1250        svc_putnl(resv, ntohl(auth_stat));      /* status */
1251        goto sendit;
1252
1253err_bad_prog:
1254        dprintk("svc: unknown program %d\n", prog);
1255        serv->sv_stats->rpcbadfmt++;
1256        svc_putnl(resv, RPC_PROG_UNAVAIL);
1257        goto sendit;
1258
1259err_bad_vers:
1260        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1261                       vers, prog, progp->pg_name);
1262
1263        serv->sv_stats->rpcbadfmt++;
1264        svc_putnl(resv, RPC_PROG_MISMATCH);
1265        svc_putnl(resv, progp->pg_lovers);
1266        svc_putnl(resv, progp->pg_hivers);
1267        goto sendit;
1268
1269err_bad_proc:
1270        svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1271
1272        serv->sv_stats->rpcbadfmt++;
1273        svc_putnl(resv, RPC_PROC_UNAVAIL);
1274        goto sendit;
1275
1276err_garbage:
1277        svc_printk(rqstp, "failed to decode args\n");
1278
1279        rpc_stat = rpc_garbage_args;
1280err_bad:
1281        serv->sv_stats->rpcbadfmt++;
1282        svc_putnl(resv, ntohl(rpc_stat));
1283        goto sendit;
1284}
1285EXPORT_SYMBOL_GPL(svc_process);
1286
1287/*
1288 * Process the RPC request.
1289 */
1290int
1291svc_process(struct svc_rqst *rqstp)
1292{
1293        struct kvec             *argv = &rqstp->rq_arg.head[0];
1294        struct kvec             *resv = &rqstp->rq_res.head[0];
1295        struct svc_serv         *serv = rqstp->rq_server;
1296        u32                     dir;
1297
1298        /*
1299         * Setup response xdr_buf.
1300         * Initially it has just one page
1301         */
1302        rqstp->rq_resused = 1;
1303        resv->iov_base = page_address(rqstp->rq_respages[0]);
1304        resv->iov_len = 0;
1305        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1306        rqstp->rq_res.len = 0;
1307        rqstp->rq_res.page_base = 0;
1308        rqstp->rq_res.page_len = 0;
1309        rqstp->rq_res.buflen = PAGE_SIZE;
1310        rqstp->rq_res.tail[0].iov_base = NULL;
1311        rqstp->rq_res.tail[0].iov_len = 0;
1312
1313        rqstp->rq_xid = svc_getu32(argv);
1314
1315        dir  = svc_getnl(argv);
1316        if (dir != 0) {
1317                /* direction != CALL */
1318                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1319                serv->sv_stats->rpcbadfmt++;
1320                svc_drop(rqstp);
1321                return 0;
1322        }
1323
1324        /* Returns 1 for send, 0 for drop */
1325        if (svc_process_common(rqstp, argv, resv))
1326                return svc_send(rqstp);
1327        else {
1328                svc_drop(rqstp);
1329                return 0;
1330        }
1331}
1332
1333#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1334/*
1335 * Process a backchannel RPC request that arrived over an existing
1336 * outbound connection
1337 */
1338int
1339bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1340               struct svc_rqst *rqstp)
1341{
1342        struct kvec     *argv = &rqstp->rq_arg.head[0];
1343        struct kvec     *resv = &rqstp->rq_res.head[0];
1344
1345        /* Build the svc_rqst used by the common processing routine */
1346        rqstp->rq_xprt = serv->sv_bc_xprt;
1347        rqstp->rq_xid = req->rq_xid;
1348        rqstp->rq_prot = req->rq_xprt->prot;
1349        rqstp->rq_server = serv;
1350
1351        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1352        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1353        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1354        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1355
1356        /* reset result send buffer "put" position */
1357        resv->iov_len = 0;
1358
1359        if (rqstp->rq_prot != IPPROTO_TCP) {
1360                printk(KERN_ERR "No support for Non-TCP transports!\n");
1361                BUG();
1362        }
1363
1364        /*
1365         * Skip the next two words because they've already been
1366         * processed in the trasport
1367         */
1368        svc_getu32(argv);       /* XID */
1369        svc_getnl(argv);        /* CALLDIR */
1370
1371        /* Returns 1 for send, 0 for drop */
1372        if (svc_process_common(rqstp, argv, resv)) {
1373                memcpy(&req->rq_snd_buf, &rqstp->rq_res,
1374                                                sizeof(req->rq_snd_buf));
1375                return bc_send(req);
1376        } else {
1377                /* drop request */
1378                xprt_free_bc_request(req);
1379                return 0;
1380        }
1381}
1382EXPORT_SYMBOL_GPL(bc_svc_process);
1383#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1384
1385/*
1386 * Return (transport-specific) limit on the rpc payload.
1387 */
1388u32 svc_max_payload(const struct svc_rqst *rqstp)
1389{
1390        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1391
1392        if (rqstp->rq_server->sv_max_payload < max)
1393                max = rqstp->rq_server->sv_max_payload;
1394        return max;
1395}
1396EXPORT_SYMBOL_GPL(svc_max_payload);
1397
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.