linux/net/sunrpc/svc.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/svc.c
   3 *
   4 * High-level RPC service routines
   5 *
   6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   7 *
   8 * Multiple threads pools and NUMAisation
   9 * Copyright (c) 2006 Silicon Graphics, Inc.
  10 * by Greg Banks <gnb@melbourne.sgi.com>
  11 */
  12
  13#include <linux/linkage.h>
  14#include <linux/sched.h>
  15#include <linux/errno.h>
  16#include <linux/net.h>
  17#include <linux/in.h>
  18#include <linux/mm.h>
  19#include <linux/interrupt.h>
  20#include <linux/module.h>
  21#include <linux/kthread.h>
  22
  23#include <linux/sunrpc/types.h>
  24#include <linux/sunrpc/xdr.h>
  25#include <linux/sunrpc/stats.h>
  26#include <linux/sunrpc/svcsock.h>
  27#include <linux/sunrpc/clnt.h>
  28
  29#define RPCDBG_FACILITY RPCDBG_SVCDSP
  30
  31static void svc_unregister(const struct svc_serv *serv);
  32
  33#define svc_serv_is_pooled(serv)    ((serv)->sv_function)
  34
  35/*
  36 * Mode for mapping cpus to pools.
  37 */
  38enum {
  39        SVC_POOL_AUTO = -1,     /* choose one of the others */
  40        SVC_POOL_GLOBAL,        /* no mapping, just a single global pool
  41                                 * (legacy & UP mode) */
  42        SVC_POOL_PERCPU,        /* one pool per cpu */
  43        SVC_POOL_PERNODE        /* one pool per numa node */
  44};
  45#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  46
  47/*
  48 * Structure for mapping cpus to pools and vice versa.
  49 * Setup once during sunrpc initialisation.
  50 */
  51static struct svc_pool_map {
  52        int count;                      /* How many svc_servs use us */
  53        int mode;                       /* Note: int not enum to avoid
  54                                         * warnings about "enumeration value
  55                                         * not handled in switch" */
  56        unsigned int npools;
  57        unsigned int *pool_to;          /* maps pool id to cpu or node */
  58        unsigned int *to_pool;          /* maps cpu or node to pool id */
  59} svc_pool_map = {
  60        .count = 0,
  61        .mode = SVC_POOL_DEFAULT
  62};
  63static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  64
  65static int
  66param_set_pool_mode(const char *val, struct kernel_param *kp)
  67{
  68        int *ip = (int *)kp->arg;
  69        struct svc_pool_map *m = &svc_pool_map;
  70        int err;
  71
  72        mutex_lock(&svc_pool_map_mutex);
  73
  74        err = -EBUSY;
  75        if (m->count)
  76                goto out;
  77
  78        err = 0;
  79        if (!strncmp(val, "auto", 4))
  80                *ip = SVC_POOL_AUTO;
  81        else if (!strncmp(val, "global", 6))
  82                *ip = SVC_POOL_GLOBAL;
  83        else if (!strncmp(val, "percpu", 6))
  84                *ip = SVC_POOL_PERCPU;
  85        else if (!strncmp(val, "pernode", 7))
  86                *ip = SVC_POOL_PERNODE;
  87        else
  88                err = -EINVAL;
  89
  90out:
  91        mutex_unlock(&svc_pool_map_mutex);
  92        return err;
  93}
  94
  95static int
  96param_get_pool_mode(char *buf, struct kernel_param *kp)
  97{
  98        int *ip = (int *)kp->arg;
  99
 100        switch (*ip)
 101        {
 102        case SVC_POOL_AUTO:
 103                return strlcpy(buf, "auto", 20);
 104        case SVC_POOL_GLOBAL:
 105                return strlcpy(buf, "global", 20);
 106        case SVC_POOL_PERCPU:
 107                return strlcpy(buf, "percpu", 20);
 108        case SVC_POOL_PERNODE:
 109                return strlcpy(buf, "pernode", 20);
 110        default:
 111                return sprintf(buf, "%d", *ip);
 112        }
 113}
 114
 115module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 116                 &svc_pool_map.mode, 0644);
 117
 118/*
 119 * Detect best pool mapping mode heuristically,
 120 * according to the machine's topology.
 121 */
 122static int
 123svc_pool_map_choose_mode(void)
 124{
 125        unsigned int node;
 126
 127        if (num_online_nodes() > 1) {
 128                /*
 129                 * Actually have multiple NUMA nodes,
 130                 * so split pools on NUMA node boundaries
 131                 */
 132                return SVC_POOL_PERNODE;
 133        }
 134
 135        node = any_online_node(node_online_map);
 136        if (nr_cpus_node(node) > 2) {
 137                /*
 138                 * Non-trivial SMP, or CONFIG_NUMA on
 139                 * non-NUMA hardware, e.g. with a generic
 140                 * x86_64 kernel on Xeons.  In this case we
 141                 * want to divide the pools on cpu boundaries.
 142                 */
 143                return SVC_POOL_PERCPU;
 144        }
 145
 146        /* default: one global pool */
 147        return SVC_POOL_GLOBAL;
 148}
 149
 150/*
 151 * Allocate the to_pool[] and pool_to[] arrays.
 152 * Returns 0 on success or an errno.
 153 */
 154static int
 155svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 156{
 157        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 158        if (!m->to_pool)
 159                goto fail;
 160        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 161        if (!m->pool_to)
 162                goto fail_free;
 163
 164        return 0;
 165
 166fail_free:
 167        kfree(m->to_pool);
 168fail:
 169        return -ENOMEM;
 170}
 171
 172/*
 173 * Initialise the pool map for SVC_POOL_PERCPU mode.
 174 * Returns number of pools or <0 on error.
 175 */
 176static int
 177svc_pool_map_init_percpu(struct svc_pool_map *m)
 178{
 179        unsigned int maxpools = nr_cpu_ids;
 180        unsigned int pidx = 0;
 181        unsigned int cpu;
 182        int err;
 183
 184        err = svc_pool_map_alloc_arrays(m, maxpools);
 185        if (err)
 186                return err;
 187
 188        for_each_online_cpu(cpu) {
 189                BUG_ON(pidx > maxpools);
 190                m->to_pool[cpu] = pidx;
 191                m->pool_to[pidx] = cpu;
 192                pidx++;
 193        }
 194        /* cpus brought online later all get mapped to pool0, sorry */
 195
 196        return pidx;
 197};
 198
 199
 200/*
 201 * Initialise the pool map for SVC_POOL_PERNODE mode.
 202 * Returns number of pools or <0 on error.
 203 */
 204static int
 205svc_pool_map_init_pernode(struct svc_pool_map *m)
 206{
 207        unsigned int maxpools = nr_node_ids;
 208        unsigned int pidx = 0;
 209        unsigned int node;
 210        int err;
 211
 212        err = svc_pool_map_alloc_arrays(m, maxpools);
 213        if (err)
 214                return err;
 215
 216        for_each_node_with_cpus(node) {
 217                /* some architectures (e.g. SN2) have cpuless nodes */
 218                BUG_ON(pidx > maxpools);
 219                m->to_pool[node] = pidx;
 220                m->pool_to[pidx] = node;
 221                pidx++;
 222        }
 223        /* nodes brought online later all get mapped to pool0, sorry */
 224
 225        return pidx;
 226}
 227
 228
 229/*
 230 * Add a reference to the global map of cpus to pools (and
 231 * vice versa).  Initialise the map if we're the first user.
 232 * Returns the number of pools.
 233 */
 234static unsigned int
 235svc_pool_map_get(void)
 236{
 237        struct svc_pool_map *m = &svc_pool_map;
 238        int npools = -1;
 239
 240        mutex_lock(&svc_pool_map_mutex);
 241
 242        if (m->count++) {
 243                mutex_unlock(&svc_pool_map_mutex);
 244                return m->npools;
 245        }
 246
 247        if (m->mode == SVC_POOL_AUTO)
 248                m->mode = svc_pool_map_choose_mode();
 249
 250        switch (m->mode) {
 251        case SVC_POOL_PERCPU:
 252                npools = svc_pool_map_init_percpu(m);
 253                break;
 254        case SVC_POOL_PERNODE:
 255                npools = svc_pool_map_init_pernode(m);
 256                break;
 257        }
 258
 259        if (npools < 0) {
 260                /* default, or memory allocation failure */
 261                npools = 1;
 262                m->mode = SVC_POOL_GLOBAL;
 263        }
 264        m->npools = npools;
 265
 266        mutex_unlock(&svc_pool_map_mutex);
 267        return m->npools;
 268}
 269
 270
 271/*
 272 * Drop a reference to the global map of cpus to pools.
 273 * When the last reference is dropped, the map data is
 274 * freed; this allows the sysadmin to change the pool
 275 * mode using the pool_mode module option without
 276 * rebooting or re-loading sunrpc.ko.
 277 */
 278static void
 279svc_pool_map_put(void)
 280{
 281        struct svc_pool_map *m = &svc_pool_map;
 282
 283        mutex_lock(&svc_pool_map_mutex);
 284
 285        if (!--m->count) {
 286                m->mode = SVC_POOL_DEFAULT;
 287                kfree(m->to_pool);
 288                kfree(m->pool_to);
 289                m->npools = 0;
 290        }
 291
 292        mutex_unlock(&svc_pool_map_mutex);
 293}
 294
 295
 296/*
 297 * Set the given thread's cpus_allowed mask so that it
 298 * will only run on cpus in the given pool.
 299 */
 300static inline void
 301svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 302{
 303        struct svc_pool_map *m = &svc_pool_map;
 304        unsigned int node = m->pool_to[pidx];
 305
 306        /*
 307         * The caller checks for sv_nrpools > 1, which
 308         * implies that we've been initialized.
 309         */
 310        BUG_ON(m->count == 0);
 311
 312        switch (m->mode) {
 313        case SVC_POOL_PERCPU:
 314        {
 315                set_cpus_allowed_ptr(task, cpumask_of(node));
 316                break;
 317        }
 318        case SVC_POOL_PERNODE:
 319        {
 320                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 321                break;
 322        }
 323        }
 324}
 325
 326/*
 327 * Use the mapping mode to choose a pool for a given CPU.
 328 * Used when enqueueing an incoming RPC.  Always returns
 329 * a non-NULL pool pointer.
 330 */
 331struct svc_pool *
 332svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 333{
 334        struct svc_pool_map *m = &svc_pool_map;
 335        unsigned int pidx = 0;
 336
 337        /*
 338         * An uninitialised map happens in a pure client when
 339         * lockd is brought up, so silently treat it the
 340         * same as SVC_POOL_GLOBAL.
 341         */
 342        if (svc_serv_is_pooled(serv)) {
 343                switch (m->mode) {
 344                case SVC_POOL_PERCPU:
 345                        pidx = m->to_pool[cpu];
 346                        break;
 347                case SVC_POOL_PERNODE:
 348                        pidx = m->to_pool[cpu_to_node(cpu)];
 349                        break;
 350                }
 351        }
 352        return &serv->sv_pools[pidx % serv->sv_nrpools];
 353}
 354
 355
 356/*
 357 * Create an RPC service
 358 */
 359static struct svc_serv *
 360__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 361             void (*shutdown)(struct svc_serv *serv))
 362{
 363        struct svc_serv *serv;
 364        unsigned int vers;
 365        unsigned int xdrsize;
 366        unsigned int i;
 367
 368        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 369                return NULL;
 370        serv->sv_name      = prog->pg_name;
 371        serv->sv_program   = prog;
 372        serv->sv_nrthreads = 1;
 373        serv->sv_stats     = prog->pg_stats;
 374        if (bufsize > RPCSVC_MAXPAYLOAD)
 375                bufsize = RPCSVC_MAXPAYLOAD;
 376        serv->sv_max_payload = bufsize? bufsize : 4096;
 377        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 378        serv->sv_shutdown  = shutdown;
 379        xdrsize = 0;
 380        while (prog) {
 381                prog->pg_lovers = prog->pg_nvers-1;
 382                for (vers=0; vers<prog->pg_nvers ; vers++)
 383                        if (prog->pg_vers[vers]) {
 384                                prog->pg_hivers = vers;
 385                                if (prog->pg_lovers > vers)
 386                                        prog->pg_lovers = vers;
 387                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 388                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 389                        }
 390                prog = prog->pg_next;
 391        }
 392        serv->sv_xdrsize   = xdrsize;
 393        INIT_LIST_HEAD(&serv->sv_tempsocks);
 394        INIT_LIST_HEAD(&serv->sv_permsocks);
 395        init_timer(&serv->sv_temptimer);
 396        spin_lock_init(&serv->sv_lock);
 397
 398        serv->sv_nrpools = npools;
 399        serv->sv_pools =
 400                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 401                        GFP_KERNEL);
 402        if (!serv->sv_pools) {
 403                kfree(serv);
 404                return NULL;
 405        }
 406
 407        for (i = 0; i < serv->sv_nrpools; i++) {
 408                struct svc_pool *pool = &serv->sv_pools[i];
 409
 410                dprintk("svc: initialising pool %u for %s\n",
 411                                i, serv->sv_name);
 412
 413                pool->sp_id = i;
 414                INIT_LIST_HEAD(&pool->sp_threads);
 415                INIT_LIST_HEAD(&pool->sp_sockets);
 416                INIT_LIST_HEAD(&pool->sp_all_threads);
 417                spin_lock_init(&pool->sp_lock);
 418        }
 419
 420        /* Remove any stale portmap registrations */
 421        svc_unregister(serv);
 422
 423        return serv;
 424}
 425
 426struct svc_serv *
 427svc_create(struct svc_program *prog, unsigned int bufsize,
 428           void (*shutdown)(struct svc_serv *serv))
 429{
 430        return __svc_create(prog, bufsize, /*npools*/1, shutdown);
 431}
 432EXPORT_SYMBOL_GPL(svc_create);
 433
 434struct svc_serv *
 435svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 436                  void (*shutdown)(struct svc_serv *serv),
 437                  svc_thread_fn func, struct module *mod)
 438{
 439        struct svc_serv *serv;
 440        unsigned int npools = svc_pool_map_get();
 441
 442        serv = __svc_create(prog, bufsize, npools, shutdown);
 443
 444        if (serv != NULL) {
 445                serv->sv_function = func;
 446                serv->sv_module = mod;
 447        }
 448
 449        return serv;
 450}
 451EXPORT_SYMBOL_GPL(svc_create_pooled);
 452
 453/*
 454 * Destroy an RPC service. Should be called with appropriate locking to
 455 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 456 */
 457void
 458svc_destroy(struct svc_serv *serv)
 459{
 460        dprintk("svc: svc_destroy(%s, %d)\n",
 461                                serv->sv_program->pg_name,
 462                                serv->sv_nrthreads);
 463
 464        if (serv->sv_nrthreads) {
 465                if (--(serv->sv_nrthreads) != 0) {
 466                        svc_sock_update_bufs(serv);
 467                        return;
 468                }
 469        } else
 470                printk("svc_destroy: no threads for serv=%p!\n", serv);
 471
 472        del_timer_sync(&serv->sv_temptimer);
 473
 474        svc_close_all(&serv->sv_tempsocks);
 475
 476        if (serv->sv_shutdown)
 477                serv->sv_shutdown(serv);
 478
 479        svc_close_all(&serv->sv_permsocks);
 480
 481        BUG_ON(!list_empty(&serv->sv_permsocks));
 482        BUG_ON(!list_empty(&serv->sv_tempsocks));
 483
 484        cache_clean_deferred(serv);
 485
 486        if (svc_serv_is_pooled(serv))
 487                svc_pool_map_put();
 488
 489        svc_unregister(serv);
 490        kfree(serv->sv_pools);
 491        kfree(serv);
 492}
 493EXPORT_SYMBOL_GPL(svc_destroy);
 494
 495/*
 496 * Allocate an RPC server's buffer space.
 497 * We allocate pages and place them in rq_argpages.
 498 */
 499static int
 500svc_init_buffer(struct svc_rqst *rqstp, unsigned int size)
 501{
 502        unsigned int pages, arghi;
 503
 504        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 505                                       * We assume one is at most one page
 506                                       */
 507        arghi = 0;
 508        BUG_ON(pages > RPCSVC_MAXPAGES);
 509        while (pages) {
 510                struct page *p = alloc_page(GFP_KERNEL);
 511                if (!p)
 512                        break;
 513                rqstp->rq_pages[arghi++] = p;
 514                pages--;
 515        }
 516        return pages == 0;
 517}
 518
 519/*
 520 * Release an RPC server buffer
 521 */
 522static void
 523svc_release_buffer(struct svc_rqst *rqstp)
 524{
 525        unsigned int i;
 526
 527        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 528                if (rqstp->rq_pages[i])
 529                        put_page(rqstp->rq_pages[i]);
 530}
 531
 532struct svc_rqst *
 533svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool)
 534{
 535        struct svc_rqst *rqstp;
 536
 537        rqstp = kzalloc(sizeof(*rqstp), GFP_KERNEL);
 538        if (!rqstp)
 539                goto out_enomem;
 540
 541        init_waitqueue_head(&rqstp->rq_wait);
 542
 543        serv->sv_nrthreads++;
 544        spin_lock_bh(&pool->sp_lock);
 545        pool->sp_nrthreads++;
 546        list_add(&rqstp->rq_all, &pool->sp_all_threads);
 547        spin_unlock_bh(&pool->sp_lock);
 548        rqstp->rq_server = serv;
 549        rqstp->rq_pool = pool;
 550
 551        rqstp->rq_argp = kmalloc(serv->sv_xdrsize, GFP_KERNEL);
 552        if (!rqstp->rq_argp)
 553                goto out_thread;
 554
 555        rqstp->rq_resp = kmalloc(serv->sv_xdrsize, GFP_KERNEL);
 556        if (!rqstp->rq_resp)
 557                goto out_thread;
 558
 559        if (!svc_init_buffer(rqstp, serv->sv_max_mesg))
 560                goto out_thread;
 561
 562        return rqstp;
 563out_thread:
 564        svc_exit_thread(rqstp);
 565out_enomem:
 566        return ERR_PTR(-ENOMEM);
 567}
 568EXPORT_SYMBOL_GPL(svc_prepare_thread);
 569
 570/*
 571 * Choose a pool in which to create a new thread, for svc_set_num_threads
 572 */
 573static inline struct svc_pool *
 574choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 575{
 576        if (pool != NULL)
 577                return pool;
 578
 579        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 580}
 581
 582/*
 583 * Choose a thread to kill, for svc_set_num_threads
 584 */
 585static inline struct task_struct *
 586choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 587{
 588        unsigned int i;
 589        struct task_struct *task = NULL;
 590
 591        if (pool != NULL) {
 592                spin_lock_bh(&pool->sp_lock);
 593        } else {
 594                /* choose a pool in round-robin fashion */
 595                for (i = 0; i < serv->sv_nrpools; i++) {
 596                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 597                        spin_lock_bh(&pool->sp_lock);
 598                        if (!list_empty(&pool->sp_all_threads))
 599                                goto found_pool;
 600                        spin_unlock_bh(&pool->sp_lock);
 601                }
 602                return NULL;
 603        }
 604
 605found_pool:
 606        if (!list_empty(&pool->sp_all_threads)) {
 607                struct svc_rqst *rqstp;
 608
 609                /*
 610                 * Remove from the pool->sp_all_threads list
 611                 * so we don't try to kill it again.
 612                 */
 613                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 614                list_del_init(&rqstp->rq_all);
 615                task = rqstp->rq_task;
 616        }
 617        spin_unlock_bh(&pool->sp_lock);
 618
 619        return task;
 620}
 621
 622/*
 623 * Create or destroy enough new threads to make the number
 624 * of threads the given number.  If `pool' is non-NULL, applies
 625 * only to threads in that pool, otherwise round-robins between
 626 * all pools.  Must be called with a svc_get() reference and
 627 * the BKL or another lock to protect access to svc_serv fields.
 628 *
 629 * Destroying threads relies on the service threads filling in
 630 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 631 * has been created using svc_create_pooled().
 632 *
 633 * Based on code that used to be in nfsd_svc() but tweaked
 634 * to be pool-aware.
 635 */
 636int
 637svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 638{
 639        struct svc_rqst *rqstp;
 640        struct task_struct *task;
 641        struct svc_pool *chosen_pool;
 642        int error = 0;
 643        unsigned int state = serv->sv_nrthreads-1;
 644
 645        if (pool == NULL) {
 646                /* The -1 assumes caller has done a svc_get() */
 647                nrservs -= (serv->sv_nrthreads-1);
 648        } else {
 649                spin_lock_bh(&pool->sp_lock);
 650                nrservs -= pool->sp_nrthreads;
 651                spin_unlock_bh(&pool->sp_lock);
 652        }
 653
 654        /* create new threads */
 655        while (nrservs > 0) {
 656                nrservs--;
 657                chosen_pool = choose_pool(serv, pool, &state);
 658
 659                rqstp = svc_prepare_thread(serv, chosen_pool);
 660                if (IS_ERR(rqstp)) {
 661                        error = PTR_ERR(rqstp);
 662                        break;
 663                }
 664
 665                __module_get(serv->sv_module);
 666                task = kthread_create(serv->sv_function, rqstp, serv->sv_name);
 667                if (IS_ERR(task)) {
 668                        error = PTR_ERR(task);
 669                        module_put(serv->sv_module);
 670                        svc_exit_thread(rqstp);
 671                        break;
 672                }
 673
 674                rqstp->rq_task = task;
 675                if (serv->sv_nrpools > 1)
 676                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 677
 678                svc_sock_update_bufs(serv);
 679                wake_up_process(task);
 680        }
 681        /* destroy old threads */
 682        while (nrservs < 0 &&
 683               (task = choose_victim(serv, pool, &state)) != NULL) {
 684                send_sig(SIGINT, task, 1);
 685                nrservs++;
 686        }
 687
 688        return error;
 689}
 690EXPORT_SYMBOL_GPL(svc_set_num_threads);
 691
 692/*
 693 * Called from a server thread as it's exiting. Caller must hold the BKL or
 694 * the "service mutex", whichever is appropriate for the service.
 695 */
 696void
 697svc_exit_thread(struct svc_rqst *rqstp)
 698{
 699        struct svc_serv *serv = rqstp->rq_server;
 700        struct svc_pool *pool = rqstp->rq_pool;
 701
 702        svc_release_buffer(rqstp);
 703        kfree(rqstp->rq_resp);
 704        kfree(rqstp->rq_argp);
 705        kfree(rqstp->rq_auth_data);
 706
 707        spin_lock_bh(&pool->sp_lock);
 708        pool->sp_nrthreads--;
 709        list_del(&rqstp->rq_all);
 710        spin_unlock_bh(&pool->sp_lock);
 711
 712        kfree(rqstp);
 713
 714        /* Release the server */
 715        if (serv)
 716                svc_destroy(serv);
 717}
 718EXPORT_SYMBOL_GPL(svc_exit_thread);
 719
 720/*
 721 * Register an "inet" protocol family netid with the local
 722 * rpcbind daemon via an rpcbind v4 SET request.
 723 *
 724 * No netconfig infrastructure is available in the kernel, so
 725 * we map IP_ protocol numbers to netids by hand.
 726 *
 727 * Returns zero on success; a negative errno value is returned
 728 * if any error occurs.
 729 */
 730static int __svc_rpcb_register4(const u32 program, const u32 version,
 731                                const unsigned short protocol,
 732                                const unsigned short port)
 733{
 734        const struct sockaddr_in sin = {
 735                .sin_family             = AF_INET,
 736                .sin_addr.s_addr        = htonl(INADDR_ANY),
 737                .sin_port               = htons(port),
 738        };
 739        const char *netid;
 740        int error;
 741
 742        switch (protocol) {
 743        case IPPROTO_UDP:
 744                netid = RPCBIND_NETID_UDP;
 745                break;
 746        case IPPROTO_TCP:
 747                netid = RPCBIND_NETID_TCP;
 748                break;
 749        default:
 750                return -ENOPROTOOPT;
 751        }
 752
 753        error = rpcb_v4_register(program, version,
 754                                        (const struct sockaddr *)&sin, netid);
 755
 756        /*
 757         * User space didn't support rpcbind v4, so retry this
 758         * registration request with the legacy rpcbind v2 protocol.
 759         */
 760        if (error == -EPROTONOSUPPORT)
 761                error = rpcb_register(program, version, protocol, port);
 762
 763        return error;
 764}
 765
 766#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
 767/*
 768 * Register an "inet6" protocol family netid with the local
 769 * rpcbind daemon via an rpcbind v4 SET request.
 770 *
 771 * No netconfig infrastructure is available in the kernel, so
 772 * we map IP_ protocol numbers to netids by hand.
 773 *
 774 * Returns zero on success; a negative errno value is returned
 775 * if any error occurs.
 776 */
 777static int __svc_rpcb_register6(const u32 program, const u32 version,
 778                                const unsigned short protocol,
 779                                const unsigned short port)
 780{
 781        const struct sockaddr_in6 sin6 = {
 782                .sin6_family            = AF_INET6,
 783                .sin6_addr              = IN6ADDR_ANY_INIT,
 784                .sin6_port              = htons(port),
 785        };
 786        const char *netid;
 787        int error;
 788
 789        switch (protocol) {
 790        case IPPROTO_UDP:
 791                netid = RPCBIND_NETID_UDP6;
 792                break;
 793        case IPPROTO_TCP:
 794                netid = RPCBIND_NETID_TCP6;
 795                break;
 796        default:
 797                return -ENOPROTOOPT;
 798        }
 799
 800        error = rpcb_v4_register(program, version,
 801                                        (const struct sockaddr *)&sin6, netid);
 802
 803        /*
 804         * User space didn't support rpcbind version 4, so we won't
 805         * use a PF_INET6 listener.
 806         */
 807        if (error == -EPROTONOSUPPORT)
 808                error = -EAFNOSUPPORT;
 809
 810        return error;
 811}
 812#endif  /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */
 813
 814/*
 815 * Register a kernel RPC service via rpcbind version 4.
 816 *
 817 * Returns zero on success; a negative errno value is returned
 818 * if any error occurs.
 819 */
 820static int __svc_register(const char *progname,
 821                          const u32 program, const u32 version,
 822                          const int family,
 823                          const unsigned short protocol,
 824                          const unsigned short port)
 825{
 826        int error = -EAFNOSUPPORT;
 827
 828        switch (family) {
 829        case PF_INET:
 830                error = __svc_rpcb_register4(program, version,
 831                                                protocol, port);
 832                break;
 833#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
 834        case PF_INET6:
 835                error = __svc_rpcb_register6(program, version,
 836                                                protocol, port);
 837#endif  /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */
 838        }
 839
 840        if (error < 0)
 841                printk(KERN_WARNING "svc: failed to register %sv%u RPC "
 842                        "service (errno %d).\n", progname, version, -error);
 843        return error;
 844}
 845
 846/**
 847 * svc_register - register an RPC service with the local portmapper
 848 * @serv: svc_serv struct for the service to register
 849 * @family: protocol family of service's listener socket
 850 * @proto: transport protocol number to advertise
 851 * @port: port to advertise
 852 *
 853 * Service is registered for any address in the passed-in protocol family
 854 */
 855int svc_register(const struct svc_serv *serv, const int family,
 856                 const unsigned short proto, const unsigned short port)
 857{
 858        struct svc_program      *progp;
 859        unsigned int            i;
 860        int                     error = 0;
 861
 862        BUG_ON(proto == 0 && port == 0);
 863
 864        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 865                for (i = 0; i < progp->pg_nvers; i++) {
 866                        if (progp->pg_vers[i] == NULL)
 867                                continue;
 868
 869                        dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
 870                                        progp->pg_name,
 871                                        i,
 872                                        proto == IPPROTO_UDP?  "udp" : "tcp",
 873                                        port,
 874                                        family,
 875                                        progp->pg_vers[i]->vs_hidden?
 876                                                " (but not telling portmap)" : "");
 877
 878                        if (progp->pg_vers[i]->vs_hidden)
 879                                continue;
 880
 881                        error = __svc_register(progp->pg_name, progp->pg_prog,
 882                                                i, family, proto, port);
 883                        if (error < 0)
 884                                break;
 885                }
 886        }
 887
 888        return error;
 889}
 890
 891/*
 892 * If user space is running rpcbind, it should take the v4 UNSET
 893 * and clear everything for this [program, version].  If user space
 894 * is running portmap, it will reject the v4 UNSET, but won't have
 895 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
 896 * in this case to clear all existing entries for [program, version].
 897 */
 898static void __svc_unregister(const u32 program, const u32 version,
 899                             const char *progname)
 900{
 901        int error;
 902
 903        error = rpcb_v4_register(program, version, NULL, "");
 904
 905        /*
 906         * User space didn't support rpcbind v4, so retry this
 907         * request with the legacy rpcbind v2 protocol.
 908         */
 909        if (error == -EPROTONOSUPPORT)
 910                error = rpcb_register(program, version, 0, 0);
 911
 912        dprintk("svc: %s(%sv%u), error %d\n",
 913                        __func__, progname, version, error);
 914}
 915
 916/*
 917 * All netids, bind addresses and ports registered for [program, version]
 918 * are removed from the local rpcbind database (if the service is not
 919 * hidden) to make way for a new instance of the service.
 920 *
 921 * The result of unregistration is reported via dprintk for those who want
 922 * verification of the result, but is otherwise not important.
 923 */
 924static void svc_unregister(const struct svc_serv *serv)
 925{
 926        struct svc_program *progp;
 927        unsigned long flags;
 928        unsigned int i;
 929
 930        clear_thread_flag(TIF_SIGPENDING);
 931
 932        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 933                for (i = 0; i < progp->pg_nvers; i++) {
 934                        if (progp->pg_vers[i] == NULL)
 935                                continue;
 936                        if (progp->pg_vers[i]->vs_hidden)
 937                                continue;
 938
 939                        __svc_unregister(progp->pg_prog, i, progp->pg_name);
 940                }
 941        }
 942
 943        spin_lock_irqsave(&current->sighand->siglock, flags);
 944        recalc_sigpending();
 945        spin_unlock_irqrestore(&current->sighand->siglock, flags);
 946}
 947
 948/*
 949 * Printk the given error with the address of the client that caused it.
 950 */
 951static int
 952__attribute__ ((format (printf, 2, 3)))
 953svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
 954{
 955        va_list args;
 956        int     r;
 957        char    buf[RPC_MAX_ADDRBUFLEN];
 958
 959        if (!net_ratelimit())
 960                return 0;
 961
 962        printk(KERN_WARNING "svc: %s: ",
 963                svc_print_addr(rqstp, buf, sizeof(buf)));
 964
 965        va_start(args, fmt);
 966        r = vprintk(fmt, args);
 967        va_end(args);
 968
 969        return r;
 970}
 971
 972/*
 973 * Process the RPC request.
 974 */
 975int
 976svc_process(struct svc_rqst *rqstp)
 977{
 978        struct svc_program      *progp;
 979        struct svc_version      *versp = NULL;  /* compiler food */
 980        struct svc_procedure    *procp = NULL;
 981        struct kvec *           argv = &rqstp->rq_arg.head[0];
 982        struct kvec *           resv = &rqstp->rq_res.head[0];
 983        struct svc_serv         *serv = rqstp->rq_server;
 984        kxdrproc_t              xdr;
 985        __be32                  *statp;
 986        u32                     dir, prog, vers, proc;
 987        __be32                  auth_stat, rpc_stat;
 988        int                     auth_res;
 989        __be32                  *reply_statp;
 990
 991        rpc_stat = rpc_success;
 992
 993        if (argv->iov_len < 6*4)
 994                goto err_short_len;
 995
 996        /* setup response xdr_buf.
 997         * Initially it has just one page
 998         */
 999        rqstp->rq_resused = 1;
1000        resv->iov_base = page_address(rqstp->rq_respages[0]);
1001        resv->iov_len = 0;
1002        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1003        rqstp->rq_res.len = 0;
1004        rqstp->rq_res.page_base = 0;
1005        rqstp->rq_res.page_len = 0;
1006        rqstp->rq_res.buflen = PAGE_SIZE;
1007        rqstp->rq_res.tail[0].iov_base = NULL;
1008        rqstp->rq_res.tail[0].iov_len = 0;
1009        /* Will be turned off only in gss privacy case: */
1010        rqstp->rq_splice_ok = 1;
1011        /* Will be turned off only when NFSv4 Sessions are used */
1012        rqstp->rq_usedeferral = 1;
1013
1014        /* Setup reply header */
1015        rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1016
1017        rqstp->rq_xid = svc_getu32(argv);
1018        svc_putu32(resv, rqstp->rq_xid);
1019
1020        dir  = svc_getnl(argv);
1021        vers = svc_getnl(argv);
1022
1023        /* First words of reply: */
1024        svc_putnl(resv, 1);             /* REPLY */
1025
1026        if (dir != 0)           /* direction != CALL */
1027                goto err_bad_dir;
1028        if (vers != 2)          /* RPC version number */
1029                goto err_bad_rpc;
1030
1031        /* Save position in case we later decide to reject: */
1032        reply_statp = resv->iov_base + resv->iov_len;
1033
1034        svc_putnl(resv, 0);             /* ACCEPT */
1035
1036        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1037        rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
1038        rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
1039
1040        progp = serv->sv_program;
1041
1042        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1043                if (prog == progp->pg_prog)
1044                        break;
1045
1046        /*
1047         * Decode auth data, and add verifier to reply buffer.
1048         * We do this before anything else in order to get a decent
1049         * auth verifier.
1050         */
1051        auth_res = svc_authenticate(rqstp, &auth_stat);
1052        /* Also give the program a chance to reject this call: */
1053        if (auth_res == SVC_OK && progp) {
1054                auth_stat = rpc_autherr_badcred;
1055                auth_res = progp->pg_authenticate(rqstp);
1056        }
1057        switch (auth_res) {
1058        case SVC_OK:
1059                break;
1060        case SVC_GARBAGE:
1061                goto err_garbage;
1062        case SVC_SYSERR:
1063                rpc_stat = rpc_system_err;
1064                goto err_bad;
1065        case SVC_DENIED:
1066                goto err_bad_auth;
1067        case SVC_DROP:
1068                goto dropit;
1069        case SVC_COMPLETE:
1070                goto sendit;
1071        }
1072
1073        if (progp == NULL)
1074                goto err_bad_prog;
1075
1076        if (vers >= progp->pg_nvers ||
1077          !(versp = progp->pg_vers[vers]))
1078                goto err_bad_vers;
1079
1080        procp = versp->vs_proc + proc;
1081        if (proc >= versp->vs_nproc || !procp->pc_func)
1082                goto err_bad_proc;
1083        rqstp->rq_procinfo = procp;
1084
1085        /* Syntactic check complete */
1086        serv->sv_stats->rpccnt++;
1087
1088        /* Build the reply header. */
1089        statp = resv->iov_base +resv->iov_len;
1090        svc_putnl(resv, RPC_SUCCESS);
1091
1092        /* Bump per-procedure stats counter */
1093        procp->pc_count++;
1094
1095        /* Initialize storage for argp and resp */
1096        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1097        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1098
1099        /* un-reserve some of the out-queue now that we have a
1100         * better idea of reply size
1101         */
1102        if (procp->pc_xdrressize)
1103                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1104
1105        /* Call the function that processes the request. */
1106        if (!versp->vs_dispatch) {
1107                /* Decode arguments */
1108                xdr = procp->pc_decode;
1109                if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
1110                        goto err_garbage;
1111
1112                *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
1113
1114                /* Encode reply */
1115                if (*statp == rpc_drop_reply) {
1116                        if (procp->pc_release)
1117                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1118                        goto dropit;
1119                }
1120                if (*statp == rpc_success && (xdr = procp->pc_encode)
1121                 && !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1122                        dprintk("svc: failed to encode reply\n");
1123                        /* serv->sv_stats->rpcsystemerr++; */
1124                        *statp = rpc_system_err;
1125                }
1126        } else {
1127                dprintk("svc: calling dispatcher\n");
1128                if (!versp->vs_dispatch(rqstp, statp)) {
1129                        /* Release reply info */
1130                        if (procp->pc_release)
1131                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1132                        goto dropit;
1133                }
1134        }
1135
1136        /* Check RPC status result */
1137        if (*statp != rpc_success)
1138                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1139
1140        /* Release reply info */
1141        if (procp->pc_release)
1142                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1143
1144        if (procp->pc_encode == NULL)
1145                goto dropit;
1146
1147 sendit:
1148        if (svc_authorise(rqstp))
1149                goto dropit;
1150        return svc_send(rqstp);
1151
1152 dropit:
1153        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1154        dprintk("svc: svc_process dropit\n");
1155        svc_drop(rqstp);
1156        return 0;
1157
1158err_short_len:
1159        svc_printk(rqstp, "short len %Zd, dropping request\n",
1160                        argv->iov_len);
1161
1162        goto dropit;                    /* drop request */
1163
1164err_bad_dir:
1165        svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1166
1167        serv->sv_stats->rpcbadfmt++;
1168        goto dropit;                    /* drop request */
1169
1170err_bad_rpc:
1171        serv->sv_stats->rpcbadfmt++;
1172        svc_putnl(resv, 1);     /* REJECT */
1173        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1174        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1175        svc_putnl(resv, 2);
1176        goto sendit;
1177
1178err_bad_auth:
1179        dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1180        serv->sv_stats->rpcbadauth++;
1181        /* Restore write pointer to location of accept status: */
1182        xdr_ressize_check(rqstp, reply_statp);
1183        svc_putnl(resv, 1);     /* REJECT */
1184        svc_putnl(resv, 1);     /* AUTH_ERROR */
1185        svc_putnl(resv, ntohl(auth_stat));      /* status */
1186        goto sendit;
1187
1188err_bad_prog:
1189        dprintk("svc: unknown program %d\n", prog);
1190        serv->sv_stats->rpcbadfmt++;
1191        svc_putnl(resv, RPC_PROG_UNAVAIL);
1192        goto sendit;
1193
1194err_bad_vers:
1195        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1196                       vers, prog, progp->pg_name);
1197
1198        serv->sv_stats->rpcbadfmt++;
1199        svc_putnl(resv, RPC_PROG_MISMATCH);
1200        svc_putnl(resv, progp->pg_lovers);
1201        svc_putnl(resv, progp->pg_hivers);
1202        goto sendit;
1203
1204err_bad_proc:
1205        svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1206
1207        serv->sv_stats->rpcbadfmt++;
1208        svc_putnl(resv, RPC_PROC_UNAVAIL);
1209        goto sendit;
1210
1211err_garbage:
1212        svc_printk(rqstp, "failed to decode args\n");
1213
1214        rpc_stat = rpc_garbage_args;
1215err_bad:
1216        serv->sv_stats->rpcbadfmt++;
1217        svc_putnl(resv, ntohl(rpc_stat));
1218        goto sendit;
1219}
1220EXPORT_SYMBOL_GPL(svc_process);
1221
1222/*
1223 * Return (transport-specific) limit on the rpc payload.
1224 */
1225u32 svc_max_payload(const struct svc_rqst *rqstp)
1226{
1227        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1228
1229        if (rqstp->rq_server->sv_max_payload < max)
1230                max = rqstp->rq_server->sv_max_payload;
1231        return max;
1232}
1233EXPORT_SYMBOL_GPL(svc_max_payload);
1234