linux/net/sunrpc/svc.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/svc.c
   3 *
   4 * High-level RPC service routines
   5 *
   6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
   7 *
   8 * Multiple threads pools and NUMAisation
   9 * Copyright (c) 2006 Silicon Graphics, Inc.
  10 * by Greg Banks <gnb@melbourne.sgi.com>
  11 */
  12
  13#include <linux/linkage.h>
  14#include <linux/sched.h>
  15#include <linux/errno.h>
  16#include <linux/net.h>
  17#include <linux/in.h>
  18#include <linux/mm.h>
  19#include <linux/interrupt.h>
  20#include <linux/module.h>
  21#include <linux/kthread.h>
  22#include <linux/slab.h>
  23
  24#include <linux/sunrpc/types.h>
  25#include <linux/sunrpc/xdr.h>
  26#include <linux/sunrpc/stats.h>
  27#include <linux/sunrpc/svcsock.h>
  28#include <linux/sunrpc/clnt.h>
  29#include <linux/sunrpc/bc_xprt.h>
  30
  31#define RPCDBG_FACILITY RPCDBG_SVCDSP
  32
  33static void svc_unregister(const struct svc_serv *serv);
  34
  35#define svc_serv_is_pooled(serv)    ((serv)->sv_function)
  36
  37/*
  38 * Mode for mapping cpus to pools.
  39 */
  40enum {
  41        SVC_POOL_AUTO = -1,     /* choose one of the others */
  42        SVC_POOL_GLOBAL,        /* no mapping, just a single global pool
  43                                 * (legacy & UP mode) */
  44        SVC_POOL_PERCPU,        /* one pool per cpu */
  45        SVC_POOL_PERNODE        /* one pool per numa node */
  46};
  47#define SVC_POOL_DEFAULT        SVC_POOL_GLOBAL
  48
  49/*
  50 * Structure for mapping cpus to pools and vice versa.
  51 * Setup once during sunrpc initialisation.
  52 */
  53static struct svc_pool_map {
  54        int count;                      /* How many svc_servs use us */
  55        int mode;                       /* Note: int not enum to avoid
  56                                         * warnings about "enumeration value
  57                                         * not handled in switch" */
  58        unsigned int npools;
  59        unsigned int *pool_to;          /* maps pool id to cpu or node */
  60        unsigned int *to_pool;          /* maps cpu or node to pool id */
  61} svc_pool_map = {
  62        .count = 0,
  63        .mode = SVC_POOL_DEFAULT
  64};
  65static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
  66
  67static int
  68param_set_pool_mode(const char *val, struct kernel_param *kp)
  69{
  70        int *ip = (int *)kp->arg;
  71        struct svc_pool_map *m = &svc_pool_map;
  72        int err;
  73
  74        mutex_lock(&svc_pool_map_mutex);
  75
  76        err = -EBUSY;
  77        if (m->count)
  78                goto out;
  79
  80        err = 0;
  81        if (!strncmp(val, "auto", 4))
  82                *ip = SVC_POOL_AUTO;
  83        else if (!strncmp(val, "global", 6))
  84                *ip = SVC_POOL_GLOBAL;
  85        else if (!strncmp(val, "percpu", 6))
  86                *ip = SVC_POOL_PERCPU;
  87        else if (!strncmp(val, "pernode", 7))
  88                *ip = SVC_POOL_PERNODE;
  89        else
  90                err = -EINVAL;
  91
  92out:
  93        mutex_unlock(&svc_pool_map_mutex);
  94        return err;
  95}
  96
  97static int
  98param_get_pool_mode(char *buf, struct kernel_param *kp)
  99{
 100        int *ip = (int *)kp->arg;
 101
 102        switch (*ip)
 103        {
 104        case SVC_POOL_AUTO:
 105                return strlcpy(buf, "auto", 20);
 106        case SVC_POOL_GLOBAL:
 107                return strlcpy(buf, "global", 20);
 108        case SVC_POOL_PERCPU:
 109                return strlcpy(buf, "percpu", 20);
 110        case SVC_POOL_PERNODE:
 111                return strlcpy(buf, "pernode", 20);
 112        default:
 113                return sprintf(buf, "%d", *ip);
 114        }
 115}
 116
 117module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
 118                 &svc_pool_map.mode, 0644);
 119
 120/*
 121 * Detect best pool mapping mode heuristically,
 122 * according to the machine's topology.
 123 */
 124static int
 125svc_pool_map_choose_mode(void)
 126{
 127        unsigned int node;
 128
 129        if (nr_online_nodes > 1) {
 130                /*
 131                 * Actually have multiple NUMA nodes,
 132                 * so split pools on NUMA node boundaries
 133                 */
 134                return SVC_POOL_PERNODE;
 135        }
 136
 137        node = first_online_node;
 138        if (nr_cpus_node(node) > 2) {
 139                /*
 140                 * Non-trivial SMP, or CONFIG_NUMA on
 141                 * non-NUMA hardware, e.g. with a generic
 142                 * x86_64 kernel on Xeons.  In this case we
 143                 * want to divide the pools on cpu boundaries.
 144                 */
 145                return SVC_POOL_PERCPU;
 146        }
 147
 148        /* default: one global pool */
 149        return SVC_POOL_GLOBAL;
 150}
 151
 152/*
 153 * Allocate the to_pool[] and pool_to[] arrays.
 154 * Returns 0 on success or an errno.
 155 */
 156static int
 157svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
 158{
 159        m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 160        if (!m->to_pool)
 161                goto fail;
 162        m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
 163        if (!m->pool_to)
 164                goto fail_free;
 165
 166        return 0;
 167
 168fail_free:
 169        kfree(m->to_pool);
 170        m->to_pool = NULL;
 171fail:
 172        return -ENOMEM;
 173}
 174
 175/*
 176 * Initialise the pool map for SVC_POOL_PERCPU mode.
 177 * Returns number of pools or <0 on error.
 178 */
 179static int
 180svc_pool_map_init_percpu(struct svc_pool_map *m)
 181{
 182        unsigned int maxpools = nr_cpu_ids;
 183        unsigned int pidx = 0;
 184        unsigned int cpu;
 185        int err;
 186
 187        err = svc_pool_map_alloc_arrays(m, maxpools);
 188        if (err)
 189                return err;
 190
 191        for_each_online_cpu(cpu) {
 192                BUG_ON(pidx > maxpools);
 193                m->to_pool[cpu] = pidx;
 194                m->pool_to[pidx] = cpu;
 195                pidx++;
 196        }
 197        /* cpus brought online later all get mapped to pool0, sorry */
 198
 199        return pidx;
 200};
 201
 202
 203/*
 204 * Initialise the pool map for SVC_POOL_PERNODE mode.
 205 * Returns number of pools or <0 on error.
 206 */
 207static int
 208svc_pool_map_init_pernode(struct svc_pool_map *m)
 209{
 210        unsigned int maxpools = nr_node_ids;
 211        unsigned int pidx = 0;
 212        unsigned int node;
 213        int err;
 214
 215        err = svc_pool_map_alloc_arrays(m, maxpools);
 216        if (err)
 217                return err;
 218
 219        for_each_node_with_cpus(node) {
 220                /* some architectures (e.g. SN2) have cpuless nodes */
 221                BUG_ON(pidx > maxpools);
 222                m->to_pool[node] = pidx;
 223                m->pool_to[pidx] = node;
 224                pidx++;
 225        }
 226        /* nodes brought online later all get mapped to pool0, sorry */
 227
 228        return pidx;
 229}
 230
 231
 232/*
 233 * Add a reference to the global map of cpus to pools (and
 234 * vice versa).  Initialise the map if we're the first user.
 235 * Returns the number of pools.
 236 */
 237static unsigned int
 238svc_pool_map_get(void)
 239{
 240        struct svc_pool_map *m = &svc_pool_map;
 241        int npools = -1;
 242
 243        mutex_lock(&svc_pool_map_mutex);
 244
 245        if (m->count++) {
 246                mutex_unlock(&svc_pool_map_mutex);
 247                return m->npools;
 248        }
 249
 250        if (m->mode == SVC_POOL_AUTO)
 251                m->mode = svc_pool_map_choose_mode();
 252
 253        switch (m->mode) {
 254        case SVC_POOL_PERCPU:
 255                npools = svc_pool_map_init_percpu(m);
 256                break;
 257        case SVC_POOL_PERNODE:
 258                npools = svc_pool_map_init_pernode(m);
 259                break;
 260        }
 261
 262        if (npools < 0) {
 263                /* default, or memory allocation failure */
 264                npools = 1;
 265                m->mode = SVC_POOL_GLOBAL;
 266        }
 267        m->npools = npools;
 268
 269        mutex_unlock(&svc_pool_map_mutex);
 270        return m->npools;
 271}
 272
 273
 274/*
 275 * Drop a reference to the global map of cpus to pools.
 276 * When the last reference is dropped, the map data is
 277 * freed; this allows the sysadmin to change the pool
 278 * mode using the pool_mode module option without
 279 * rebooting or re-loading sunrpc.ko.
 280 */
 281static void
 282svc_pool_map_put(void)
 283{
 284        struct svc_pool_map *m = &svc_pool_map;
 285
 286        mutex_lock(&svc_pool_map_mutex);
 287
 288        if (!--m->count) {
 289                m->mode = SVC_POOL_DEFAULT;
 290                kfree(m->to_pool);
 291                m->to_pool = NULL;
 292                kfree(m->pool_to);
 293                m->pool_to = NULL;
 294                m->npools = 0;
 295        }
 296
 297        mutex_unlock(&svc_pool_map_mutex);
 298}
 299
 300
 301static int svc_pool_map_get_node(unsigned int pidx)
 302{
 303        const struct svc_pool_map *m = &svc_pool_map;
 304
 305        if (m->count) {
 306                if (m->mode == SVC_POOL_PERCPU)
 307                        return cpu_to_node(m->pool_to[pidx]);
 308                if (m->mode == SVC_POOL_PERNODE)
 309                        return m->pool_to[pidx];
 310        }
 311        return NUMA_NO_NODE;
 312}
 313/*
 314 * Set the given thread's cpus_allowed mask so that it
 315 * will only run on cpus in the given pool.
 316 */
 317static inline void
 318svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
 319{
 320        struct svc_pool_map *m = &svc_pool_map;
 321        unsigned int node = m->pool_to[pidx];
 322
 323        /*
 324         * The caller checks for sv_nrpools > 1, which
 325         * implies that we've been initialized.
 326         */
 327        BUG_ON(m->count == 0);
 328
 329        switch (m->mode) {
 330        case SVC_POOL_PERCPU:
 331        {
 332                set_cpus_allowed_ptr(task, cpumask_of(node));
 333                break;
 334        }
 335        case SVC_POOL_PERNODE:
 336        {
 337                set_cpus_allowed_ptr(task, cpumask_of_node(node));
 338                break;
 339        }
 340        }
 341}
 342
 343/*
 344 * Use the mapping mode to choose a pool for a given CPU.
 345 * Used when enqueueing an incoming RPC.  Always returns
 346 * a non-NULL pool pointer.
 347 */
 348struct svc_pool *
 349svc_pool_for_cpu(struct svc_serv *serv, int cpu)
 350{
 351        struct svc_pool_map *m = &svc_pool_map;
 352        unsigned int pidx = 0;
 353
 354        /*
 355         * An uninitialised map happens in a pure client when
 356         * lockd is brought up, so silently treat it the
 357         * same as SVC_POOL_GLOBAL.
 358         */
 359        if (svc_serv_is_pooled(serv)) {
 360                switch (m->mode) {
 361                case SVC_POOL_PERCPU:
 362                        pidx = m->to_pool[cpu];
 363                        break;
 364                case SVC_POOL_PERNODE:
 365                        pidx = m->to_pool[cpu_to_node(cpu)];
 366                        break;
 367                }
 368        }
 369        return &serv->sv_pools[pidx % serv->sv_nrpools];
 370}
 371
 372static int svc_rpcb_setup(struct svc_serv *serv)
 373{
 374        int err;
 375
 376        err = rpcb_create_local();
 377        if (err)
 378                return err;
 379
 380        /* Remove any stale portmap registrations */
 381        svc_unregister(serv);
 382        return 0;
 383}
 384
 385void svc_rpcb_cleanup(struct svc_serv *serv)
 386{
 387        svc_unregister(serv);
 388        rpcb_put_local();
 389}
 390EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
 391
 392static int svc_uses_rpcbind(struct svc_serv *serv)
 393{
 394        struct svc_program      *progp;
 395        unsigned int            i;
 396
 397        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 398                for (i = 0; i < progp->pg_nvers; i++) {
 399                        if (progp->pg_vers[i] == NULL)
 400                                continue;
 401                        if (progp->pg_vers[i]->vs_hidden == 0)
 402                                return 1;
 403                }
 404        }
 405
 406        return 0;
 407}
 408
 409/*
 410 * Create an RPC service
 411 */
 412static struct svc_serv *
 413__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 414             void (*shutdown)(struct svc_serv *serv))
 415{
 416        struct svc_serv *serv;
 417        unsigned int vers;
 418        unsigned int xdrsize;
 419        unsigned int i;
 420
 421        if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
 422                return NULL;
 423        serv->sv_name      = prog->pg_name;
 424        serv->sv_program   = prog;
 425        serv->sv_nrthreads = 1;
 426        serv->sv_stats     = prog->pg_stats;
 427        if (bufsize > RPCSVC_MAXPAYLOAD)
 428                bufsize = RPCSVC_MAXPAYLOAD;
 429        serv->sv_max_payload = bufsize? bufsize : 4096;
 430        serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
 431        serv->sv_shutdown  = shutdown;
 432        xdrsize = 0;
 433        while (prog) {
 434                prog->pg_lovers = prog->pg_nvers-1;
 435                for (vers=0; vers<prog->pg_nvers ; vers++)
 436                        if (prog->pg_vers[vers]) {
 437                                prog->pg_hivers = vers;
 438                                if (prog->pg_lovers > vers)
 439                                        prog->pg_lovers = vers;
 440                                if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
 441                                        xdrsize = prog->pg_vers[vers]->vs_xdrsize;
 442                        }
 443                prog = prog->pg_next;
 444        }
 445        serv->sv_xdrsize   = xdrsize;
 446        INIT_LIST_HEAD(&serv->sv_tempsocks);
 447        INIT_LIST_HEAD(&serv->sv_permsocks);
 448        init_timer(&serv->sv_temptimer);
 449        spin_lock_init(&serv->sv_lock);
 450
 451        serv->sv_nrpools = npools;
 452        serv->sv_pools =
 453                kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
 454                        GFP_KERNEL);
 455        if (!serv->sv_pools) {
 456                kfree(serv);
 457                return NULL;
 458        }
 459
 460        for (i = 0; i < serv->sv_nrpools; i++) {
 461                struct svc_pool *pool = &serv->sv_pools[i];
 462
 463                dprintk("svc: initialising pool %u for %s\n",
 464                                i, serv->sv_name);
 465
 466                pool->sp_id = i;
 467                INIT_LIST_HEAD(&pool->sp_threads);
 468                INIT_LIST_HEAD(&pool->sp_sockets);
 469                INIT_LIST_HEAD(&pool->sp_all_threads);
 470                spin_lock_init(&pool->sp_lock);
 471        }
 472
 473        if (svc_uses_rpcbind(serv)) {
 474                if (svc_rpcb_setup(serv) < 0) {
 475                        kfree(serv->sv_pools);
 476                        kfree(serv);
 477                        return NULL;
 478                }
 479                if (!serv->sv_shutdown)
 480                        serv->sv_shutdown = svc_rpcb_cleanup;
 481        }
 482
 483        return serv;
 484}
 485
 486struct svc_serv *
 487svc_create(struct svc_program *prog, unsigned int bufsize,
 488           void (*shutdown)(struct svc_serv *serv))
 489{
 490        return __svc_create(prog, bufsize, /*npools*/1, shutdown);
 491}
 492EXPORT_SYMBOL_GPL(svc_create);
 493
 494struct svc_serv *
 495svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 496                  void (*shutdown)(struct svc_serv *serv),
 497                  svc_thread_fn func, struct module *mod)
 498{
 499        struct svc_serv *serv;
 500        unsigned int npools = svc_pool_map_get();
 501
 502        serv = __svc_create(prog, bufsize, npools, shutdown);
 503
 504        if (serv != NULL) {
 505                serv->sv_function = func;
 506                serv->sv_module = mod;
 507        }
 508
 509        return serv;
 510}
 511EXPORT_SYMBOL_GPL(svc_create_pooled);
 512
 513/*
 514 * Destroy an RPC service. Should be called with appropriate locking to
 515 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
 516 */
 517void
 518svc_destroy(struct svc_serv *serv)
 519{
 520        dprintk("svc: svc_destroy(%s, %d)\n",
 521                                serv->sv_program->pg_name,
 522                                serv->sv_nrthreads);
 523
 524        if (serv->sv_nrthreads) {
 525                if (--(serv->sv_nrthreads) != 0) {
 526                        svc_sock_update_bufs(serv);
 527                        return;
 528                }
 529        } else
 530                printk("svc_destroy: no threads for serv=%p!\n", serv);
 531
 532        del_timer_sync(&serv->sv_temptimer);
 533        /*
 534         * The set of xprts (contained in the sv_tempsocks and
 535         * sv_permsocks lists) is now constant, since it is modified
 536         * only by accepting new sockets (done by service threads in
 537         * svc_recv) or aging old ones (done by sv_temptimer), or
 538         * configuration changes (excluded by whatever locking the
 539         * caller is using--nfsd_mutex in the case of nfsd).  So it's
 540         * safe to traverse those lists and shut everything down:
 541         */
 542        svc_close_all(serv);
 543
 544        if (serv->sv_shutdown)
 545                serv->sv_shutdown(serv);
 546
 547        cache_clean_deferred(serv);
 548
 549        if (svc_serv_is_pooled(serv))
 550                svc_pool_map_put();
 551
 552        kfree(serv->sv_pools);
 553        kfree(serv);
 554}
 555EXPORT_SYMBOL_GPL(svc_destroy);
 556
 557/*
 558 * Allocate an RPC server's buffer space.
 559 * We allocate pages and place them in rq_argpages.
 560 */
 561static int
 562svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
 563{
 564        unsigned int pages, arghi;
 565
 566        /* bc_xprt uses fore channel allocated buffers */
 567        if (svc_is_backchannel(rqstp))
 568                return 1;
 569
 570        pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
 571                                       * We assume one is at most one page
 572                                       */
 573        arghi = 0;
 574        BUG_ON(pages > RPCSVC_MAXPAGES);
 575        while (pages) {
 576                struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
 577                if (!p)
 578                        break;
 579                rqstp->rq_pages[arghi++] = p;
 580                pages--;
 581        }
 582        return pages == 0;
 583}
 584
 585/*
 586 * Release an RPC server buffer
 587 */
 588static void
 589svc_release_buffer(struct svc_rqst *rqstp)
 590{
 591        unsigned int i;
 592
 593        for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
 594                if (rqstp->rq_pages[i])
 595                        put_page(rqstp->rq_pages[i]);
 596}
 597
 598struct svc_rqst *
 599svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
 600{
 601        struct svc_rqst *rqstp;
 602
 603        rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
 604        if (!rqstp)
 605                goto out_enomem;
 606
 607        init_waitqueue_head(&rqstp->rq_wait);
 608
 609        serv->sv_nrthreads++;
 610        spin_lock_bh(&pool->sp_lock);
 611        pool->sp_nrthreads++;
 612        list_add(&rqstp->rq_all, &pool->sp_all_threads);
 613        spin_unlock_bh(&pool->sp_lock);
 614        rqstp->rq_server = serv;
 615        rqstp->rq_pool = pool;
 616
 617        rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 618        if (!rqstp->rq_argp)
 619                goto out_thread;
 620
 621        rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 622        if (!rqstp->rq_resp)
 623                goto out_thread;
 624
 625        if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
 626                goto out_thread;
 627
 628        return rqstp;
 629out_thread:
 630        svc_exit_thread(rqstp);
 631out_enomem:
 632        return ERR_PTR(-ENOMEM);
 633}
 634EXPORT_SYMBOL_GPL(svc_prepare_thread);
 635
 636/*
 637 * Choose a pool in which to create a new thread, for svc_set_num_threads
 638 */
 639static inline struct svc_pool *
 640choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 641{
 642        if (pool != NULL)
 643                return pool;
 644
 645        return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
 646}
 647
 648/*
 649 * Choose a thread to kill, for svc_set_num_threads
 650 */
 651static inline struct task_struct *
 652choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
 653{
 654        unsigned int i;
 655        struct task_struct *task = NULL;
 656
 657        if (pool != NULL) {
 658                spin_lock_bh(&pool->sp_lock);
 659        } else {
 660                /* choose a pool in round-robin fashion */
 661                for (i = 0; i < serv->sv_nrpools; i++) {
 662                        pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
 663                        spin_lock_bh(&pool->sp_lock);
 664                        if (!list_empty(&pool->sp_all_threads))
 665                                goto found_pool;
 666                        spin_unlock_bh(&pool->sp_lock);
 667                }
 668                return NULL;
 669        }
 670
 671found_pool:
 672        if (!list_empty(&pool->sp_all_threads)) {
 673                struct svc_rqst *rqstp;
 674
 675                /*
 676                 * Remove from the pool->sp_all_threads list
 677                 * so we don't try to kill it again.
 678                 */
 679                rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
 680                list_del_init(&rqstp->rq_all);
 681                task = rqstp->rq_task;
 682        }
 683        spin_unlock_bh(&pool->sp_lock);
 684
 685        return task;
 686}
 687
 688/*
 689 * Create or destroy enough new threads to make the number
 690 * of threads the given number.  If `pool' is non-NULL, applies
 691 * only to threads in that pool, otherwise round-robins between
 692 * all pools.  Must be called with a svc_get() reference and
 693 * the BKL or another lock to protect access to svc_serv fields.
 694 *
 695 * Destroying threads relies on the service threads filling in
 696 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
 697 * has been created using svc_create_pooled().
 698 *
 699 * Based on code that used to be in nfsd_svc() but tweaked
 700 * to be pool-aware.
 701 */
 702int
 703svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 704{
 705        struct svc_rqst *rqstp;
 706        struct task_struct *task;
 707        struct svc_pool *chosen_pool;
 708        int error = 0;
 709        unsigned int state = serv->sv_nrthreads-1;
 710        int node;
 711
 712        if (pool == NULL) {
 713                /* The -1 assumes caller has done a svc_get() */
 714                nrservs -= (serv->sv_nrthreads-1);
 715        } else {
 716                spin_lock_bh(&pool->sp_lock);
 717                nrservs -= pool->sp_nrthreads;
 718                spin_unlock_bh(&pool->sp_lock);
 719        }
 720
 721        /* create new threads */
 722        while (nrservs > 0) {
 723                nrservs--;
 724                chosen_pool = choose_pool(serv, pool, &state);
 725
 726                node = svc_pool_map_get_node(chosen_pool->sp_id);
 727                rqstp = svc_prepare_thread(serv, chosen_pool, node);
 728                if (IS_ERR(rqstp)) {
 729                        error = PTR_ERR(rqstp);
 730                        break;
 731                }
 732
 733                __module_get(serv->sv_module);
 734                task = kthread_create_on_node(serv->sv_function, rqstp,
 735                                              node, serv->sv_name);
 736                if (IS_ERR(task)) {
 737                        error = PTR_ERR(task);
 738                        module_put(serv->sv_module);
 739                        svc_exit_thread(rqstp);
 740                        break;
 741                }
 742
 743                rqstp->rq_task = task;
 744                if (serv->sv_nrpools > 1)
 745                        svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
 746
 747                svc_sock_update_bufs(serv);
 748                wake_up_process(task);
 749        }
 750        /* destroy old threads */
 751        while (nrservs < 0 &&
 752               (task = choose_victim(serv, pool, &state)) != NULL) {
 753                send_sig(SIGINT, task, 1);
 754                nrservs++;
 755        }
 756
 757        return error;
 758}
 759EXPORT_SYMBOL_GPL(svc_set_num_threads);
 760
 761/*
 762 * Called from a server thread as it's exiting. Caller must hold the BKL or
 763 * the "service mutex", whichever is appropriate for the service.
 764 */
 765void
 766svc_exit_thread(struct svc_rqst *rqstp)
 767{
 768        struct svc_serv *serv = rqstp->rq_server;
 769        struct svc_pool *pool = rqstp->rq_pool;
 770
 771        svc_release_buffer(rqstp);
 772        kfree(rqstp->rq_resp);
 773        kfree(rqstp->rq_argp);
 774        kfree(rqstp->rq_auth_data);
 775
 776        spin_lock_bh(&pool->sp_lock);
 777        pool->sp_nrthreads--;
 778        list_del(&rqstp->rq_all);
 779        spin_unlock_bh(&pool->sp_lock);
 780
 781        kfree(rqstp);
 782
 783        /* Release the server */
 784        if (serv)
 785                svc_destroy(serv);
 786}
 787EXPORT_SYMBOL_GPL(svc_exit_thread);
 788
 789/*
 790 * Register an "inet" protocol family netid with the local
 791 * rpcbind daemon via an rpcbind v4 SET request.
 792 *
 793 * No netconfig infrastructure is available in the kernel, so
 794 * we map IP_ protocol numbers to netids by hand.
 795 *
 796 * Returns zero on success; a negative errno value is returned
 797 * if any error occurs.
 798 */
 799static int __svc_rpcb_register4(const u32 program, const u32 version,
 800                                const unsigned short protocol,
 801                                const unsigned short port)
 802{
 803        const struct sockaddr_in sin = {
 804                .sin_family             = AF_INET,
 805                .sin_addr.s_addr        = htonl(INADDR_ANY),
 806                .sin_port               = htons(port),
 807        };
 808        const char *netid;
 809        int error;
 810
 811        switch (protocol) {
 812        case IPPROTO_UDP:
 813                netid = RPCBIND_NETID_UDP;
 814                break;
 815        case IPPROTO_TCP:
 816                netid = RPCBIND_NETID_TCP;
 817                break;
 818        default:
 819                return -ENOPROTOOPT;
 820        }
 821
 822        error = rpcb_v4_register(program, version,
 823                                        (const struct sockaddr *)&sin, netid);
 824
 825        /*
 826         * User space didn't support rpcbind v4, so retry this
 827         * registration request with the legacy rpcbind v2 protocol.
 828         */
 829        if (error == -EPROTONOSUPPORT)
 830                error = rpcb_register(program, version, protocol, port);
 831
 832        return error;
 833}
 834
 835#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
 836/*
 837 * Register an "inet6" protocol family netid with the local
 838 * rpcbind daemon via an rpcbind v4 SET request.
 839 *
 840 * No netconfig infrastructure is available in the kernel, so
 841 * we map IP_ protocol numbers to netids by hand.
 842 *
 843 * Returns zero on success; a negative errno value is returned
 844 * if any error occurs.
 845 */
 846static int __svc_rpcb_register6(const u32 program, const u32 version,
 847                                const unsigned short protocol,
 848                                const unsigned short port)
 849{
 850        const struct sockaddr_in6 sin6 = {
 851                .sin6_family            = AF_INET6,
 852                .sin6_addr              = IN6ADDR_ANY_INIT,
 853                .sin6_port              = htons(port),
 854        };
 855        const char *netid;
 856        int error;
 857
 858        switch (protocol) {
 859        case IPPROTO_UDP:
 860                netid = RPCBIND_NETID_UDP6;
 861                break;
 862        case IPPROTO_TCP:
 863                netid = RPCBIND_NETID_TCP6;
 864                break;
 865        default:
 866                return -ENOPROTOOPT;
 867        }
 868
 869        error = rpcb_v4_register(program, version,
 870                                        (const struct sockaddr *)&sin6, netid);
 871
 872        /*
 873         * User space didn't support rpcbind version 4, so we won't
 874         * use a PF_INET6 listener.
 875         */
 876        if (error == -EPROTONOSUPPORT)
 877                error = -EAFNOSUPPORT;
 878
 879        return error;
 880}
 881#endif  /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */
 882
 883/*
 884 * Register a kernel RPC service via rpcbind version 4.
 885 *
 886 * Returns zero on success; a negative errno value is returned
 887 * if any error occurs.
 888 */
 889static int __svc_register(const char *progname,
 890                          const u32 program, const u32 version,
 891                          const int family,
 892                          const unsigned short protocol,
 893                          const unsigned short port)
 894{
 895        int error = -EAFNOSUPPORT;
 896
 897        switch (family) {
 898        case PF_INET:
 899                error = __svc_rpcb_register4(program, version,
 900                                                protocol, port);
 901                break;
 902#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
 903        case PF_INET6:
 904                error = __svc_rpcb_register6(program, version,
 905                                                protocol, port);
 906#endif  /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */
 907        }
 908
 909        if (error < 0)
 910                printk(KERN_WARNING "svc: failed to register %sv%u RPC "
 911                        "service (errno %d).\n", progname, version, -error);
 912        return error;
 913}
 914
 915/**
 916 * svc_register - register an RPC service with the local portmapper
 917 * @serv: svc_serv struct for the service to register
 918 * @family: protocol family of service's listener socket
 919 * @proto: transport protocol number to advertise
 920 * @port: port to advertise
 921 *
 922 * Service is registered for any address in the passed-in protocol family
 923 */
 924int svc_register(const struct svc_serv *serv, const int family,
 925                 const unsigned short proto, const unsigned short port)
 926{
 927        struct svc_program      *progp;
 928        unsigned int            i;
 929        int                     error = 0;
 930
 931        BUG_ON(proto == 0 && port == 0);
 932
 933        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
 934                for (i = 0; i < progp->pg_nvers; i++) {
 935                        if (progp->pg_vers[i] == NULL)
 936                                continue;
 937
 938                        dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
 939                                        progp->pg_name,
 940                                        i,
 941                                        proto == IPPROTO_UDP?  "udp" : "tcp",
 942                                        port,
 943                                        family,
 944                                        progp->pg_vers[i]->vs_hidden?
 945                                                " (but not telling portmap)" : "");
 946
 947                        if (progp->pg_vers[i]->vs_hidden)
 948                                continue;
 949
 950                        error = __svc_register(progp->pg_name, progp->pg_prog,
 951                                                i, family, proto, port);
 952                        if (error < 0)
 953                                break;
 954                }
 955        }
 956
 957        return error;
 958}
 959
 960/*
 961 * If user space is running rpcbind, it should take the v4 UNSET
 962 * and clear everything for this [program, version].  If user space
 963 * is running portmap, it will reject the v4 UNSET, but won't have
 964 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
 965 * in this case to clear all existing entries for [program, version].
 966 */
 967static void __svc_unregister(const u32 program, const u32 version,
 968                             const char *progname)
 969{
 970        int error;
 971
 972        error = rpcb_v4_register(program, version, NULL, "");
 973
 974        /*
 975         * User space didn't support rpcbind v4, so retry this
 976         * request with the legacy rpcbind v2 protocol.
 977         */
 978        if (error == -EPROTONOSUPPORT)
 979                error = rpcb_register(program, version, 0, 0);
 980
 981        dprintk("svc: %s(%sv%u), error %d\n",
 982                        __func__, progname, version, error);
 983}
 984
 985/*
 986 * All netids, bind addresses and ports registered for [program, version]
 987 * are removed from the local rpcbind database (if the service is not
 988 * hidden) to make way for a new instance of the service.
 989 *
 990 * The result of unregistration is reported via dprintk for those who want
 991 * verification of the result, but is otherwise not important.
 992 */
 993static void svc_unregister(const struct svc_serv *serv)
 994{
 995        struct svc_program *progp;
 996        unsigned long flags;
 997        unsigned int i;
 998
 999        clear_thread_flag(TIF_SIGPENDING);
1000
1001        for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1002                for (i = 0; i < progp->pg_nvers; i++) {
1003                        if (progp->pg_vers[i] == NULL)
1004                                continue;
1005                        if (progp->pg_vers[i]->vs_hidden)
1006                                continue;
1007
1008                        dprintk("svc: attempting to unregister %sv%u\n",
1009                                progp->pg_name, i);
1010                        __svc_unregister(progp->pg_prog, i, progp->pg_name);
1011                }
1012        }
1013
1014        spin_lock_irqsave(&current->sighand->siglock, flags);
1015        recalc_sigpending();
1016        spin_unlock_irqrestore(&current->sighand->siglock, flags);
1017}
1018
1019/*
1020 * Printk the given error with the address of the client that caused it.
1021 */
1022static __printf(2, 3)
1023int svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1024{
1025        va_list args;
1026        int     r;
1027        char    buf[RPC_MAX_ADDRBUFLEN];
1028
1029        if (!net_ratelimit())
1030                return 0;
1031
1032        printk(KERN_WARNING "svc: %s: ",
1033                svc_print_addr(rqstp, buf, sizeof(buf)));
1034
1035        va_start(args, fmt);
1036        r = vprintk(fmt, args);
1037        va_end(args);
1038
1039        return r;
1040}
1041
1042/*
1043 * Common routine for processing the RPC request.
1044 */
1045static int
1046svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1047{
1048        struct svc_program      *progp;
1049        struct svc_version      *versp = NULL;  /* compiler food */
1050        struct svc_procedure    *procp = NULL;
1051        struct svc_serv         *serv = rqstp->rq_server;
1052        kxdrproc_t              xdr;
1053        __be32                  *statp;
1054        u32                     prog, vers, proc;
1055        __be32                  auth_stat, rpc_stat;
1056        int                     auth_res;
1057        __be32                  *reply_statp;
1058
1059        rpc_stat = rpc_success;
1060
1061        if (argv->iov_len < 6*4)
1062                goto err_short_len;
1063
1064        /* Will be turned off only in gss privacy case: */
1065        rqstp->rq_splice_ok = 1;
1066        /* Will be turned off only when NFSv4 Sessions are used */
1067        rqstp->rq_usedeferral = 1;
1068        rqstp->rq_dropme = false;
1069
1070        /* Setup reply header */
1071        rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1072
1073        svc_putu32(resv, rqstp->rq_xid);
1074
1075        vers = svc_getnl(argv);
1076
1077        /* First words of reply: */
1078        svc_putnl(resv, 1);             /* REPLY */
1079
1080        if (vers != 2)          /* RPC version number */
1081                goto err_bad_rpc;
1082
1083        /* Save position in case we later decide to reject: */
1084        reply_statp = resv->iov_base + resv->iov_len;
1085
1086        svc_putnl(resv, 0);             /* ACCEPT */
1087
1088        rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
1089        rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
1090        rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
1091
1092        progp = serv->sv_program;
1093
1094        for (progp = serv->sv_program; progp; progp = progp->pg_next)
1095                if (prog == progp->pg_prog)
1096                        break;
1097
1098        /*
1099         * Decode auth data, and add verifier to reply buffer.
1100         * We do this before anything else in order to get a decent
1101         * auth verifier.
1102         */
1103        auth_res = svc_authenticate(rqstp, &auth_stat);
1104        /* Also give the program a chance to reject this call: */
1105        if (auth_res == SVC_OK && progp) {
1106                auth_stat = rpc_autherr_badcred;
1107                auth_res = progp->pg_authenticate(rqstp);
1108        }
1109        switch (auth_res) {
1110        case SVC_OK:
1111                break;
1112        case SVC_GARBAGE:
1113                goto err_garbage;
1114        case SVC_SYSERR:
1115                rpc_stat = rpc_system_err;
1116                goto err_bad;
1117        case SVC_DENIED:
1118                goto err_bad_auth;
1119        case SVC_CLOSE:
1120                if (test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1121                        svc_close_xprt(rqstp->rq_xprt);
1122        case SVC_DROP:
1123                goto dropit;
1124        case SVC_COMPLETE:
1125                goto sendit;
1126        }
1127
1128        if (progp == NULL)
1129                goto err_bad_prog;
1130
1131        if (vers >= progp->pg_nvers ||
1132          !(versp = progp->pg_vers[vers]))
1133                goto err_bad_vers;
1134
1135        procp = versp->vs_proc + proc;
1136        if (proc >= versp->vs_nproc || !procp->pc_func)
1137                goto err_bad_proc;
1138        rqstp->rq_procinfo = procp;
1139
1140        /* Syntactic check complete */
1141        serv->sv_stats->rpccnt++;
1142
1143        /* Build the reply header. */
1144        statp = resv->iov_base +resv->iov_len;
1145        svc_putnl(resv, RPC_SUCCESS);
1146
1147        /* Bump per-procedure stats counter */
1148        procp->pc_count++;
1149
1150        /* Initialize storage for argp and resp */
1151        memset(rqstp->rq_argp, 0, procp->pc_argsize);
1152        memset(rqstp->rq_resp, 0, procp->pc_ressize);
1153
1154        /* un-reserve some of the out-queue now that we have a
1155         * better idea of reply size
1156         */
1157        if (procp->pc_xdrressize)
1158                svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1159
1160        /* Call the function that processes the request. */
1161        if (!versp->vs_dispatch) {
1162                /* Decode arguments */
1163                xdr = procp->pc_decode;
1164                if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
1165                        goto err_garbage;
1166
1167                *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
1168
1169                /* Encode reply */
1170                if (rqstp->rq_dropme) {
1171                        if (procp->pc_release)
1172                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1173                        goto dropit;
1174                }
1175                if (*statp == rpc_success &&
1176                    (xdr = procp->pc_encode) &&
1177                    !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1178                        dprintk("svc: failed to encode reply\n");
1179                        /* serv->sv_stats->rpcsystemerr++; */
1180                        *statp = rpc_system_err;
1181                }
1182        } else {
1183                dprintk("svc: calling dispatcher\n");
1184                if (!versp->vs_dispatch(rqstp, statp)) {
1185                        /* Release reply info */
1186                        if (procp->pc_release)
1187                                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1188                        goto dropit;
1189                }
1190        }
1191
1192        /* Check RPC status result */
1193        if (*statp != rpc_success)
1194                resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1195
1196        /* Release reply info */
1197        if (procp->pc_release)
1198                procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1199
1200        if (procp->pc_encode == NULL)
1201                goto dropit;
1202
1203 sendit:
1204        if (svc_authorise(rqstp))
1205                goto dropit;
1206        return 1;               /* Caller can now send it */
1207
1208 dropit:
1209        svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1210        dprintk("svc: svc_process dropit\n");
1211        return 0;
1212
1213err_short_len:
1214        svc_printk(rqstp, "short len %Zd, dropping request\n",
1215                        argv->iov_len);
1216
1217        goto dropit;                    /* drop request */
1218
1219err_bad_rpc:
1220        serv->sv_stats->rpcbadfmt++;
1221        svc_putnl(resv, 1);     /* REJECT */
1222        svc_putnl(resv, 0);     /* RPC_MISMATCH */
1223        svc_putnl(resv, 2);     /* Only RPCv2 supported */
1224        svc_putnl(resv, 2);
1225        goto sendit;
1226
1227err_bad_auth:
1228        dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1229        serv->sv_stats->rpcbadauth++;
1230        /* Restore write pointer to location of accept status: */
1231        xdr_ressize_check(rqstp, reply_statp);
1232        svc_putnl(resv, 1);     /* REJECT */
1233        svc_putnl(resv, 1);     /* AUTH_ERROR */
1234        svc_putnl(resv, ntohl(auth_stat));      /* status */
1235        goto sendit;
1236
1237err_bad_prog:
1238        dprintk("svc: unknown program %d\n", prog);
1239        serv->sv_stats->rpcbadfmt++;
1240        svc_putnl(resv, RPC_PROG_UNAVAIL);
1241        goto sendit;
1242
1243err_bad_vers:
1244        svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1245                       vers, prog, progp->pg_name);
1246
1247        serv->sv_stats->rpcbadfmt++;
1248        svc_putnl(resv, RPC_PROG_MISMATCH);
1249        svc_putnl(resv, progp->pg_lovers);
1250        svc_putnl(resv, progp->pg_hivers);
1251        goto sendit;
1252
1253err_bad_proc:
1254        svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1255
1256        serv->sv_stats->rpcbadfmt++;
1257        svc_putnl(resv, RPC_PROC_UNAVAIL);
1258        goto sendit;
1259
1260err_garbage:
1261        svc_printk(rqstp, "failed to decode args\n");
1262
1263        rpc_stat = rpc_garbage_args;
1264err_bad:
1265        serv->sv_stats->rpcbadfmt++;
1266        svc_putnl(resv, ntohl(rpc_stat));
1267        goto sendit;
1268}
1269EXPORT_SYMBOL_GPL(svc_process);
1270
1271/*
1272 * Process the RPC request.
1273 */
1274int
1275svc_process(struct svc_rqst *rqstp)
1276{
1277        struct kvec             *argv = &rqstp->rq_arg.head[0];
1278        struct kvec             *resv = &rqstp->rq_res.head[0];
1279        struct svc_serv         *serv = rqstp->rq_server;
1280        u32                     dir;
1281
1282        /*
1283         * Setup response xdr_buf.
1284         * Initially it has just one page
1285         */
1286        rqstp->rq_resused = 1;
1287        resv->iov_base = page_address(rqstp->rq_respages[0]);
1288        resv->iov_len = 0;
1289        rqstp->rq_res.pages = rqstp->rq_respages + 1;
1290        rqstp->rq_res.len = 0;
1291        rqstp->rq_res.page_base = 0;
1292        rqstp->rq_res.page_len = 0;
1293        rqstp->rq_res.buflen = PAGE_SIZE;
1294        rqstp->rq_res.tail[0].iov_base = NULL;
1295        rqstp->rq_res.tail[0].iov_len = 0;
1296
1297        rqstp->rq_xid = svc_getu32(argv);
1298
1299        dir  = svc_getnl(argv);
1300        if (dir != 0) {
1301                /* direction != CALL */
1302                svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1303                serv->sv_stats->rpcbadfmt++;
1304                svc_drop(rqstp);
1305                return 0;
1306        }
1307
1308        /* Returns 1 for send, 0 for drop */
1309        if (svc_process_common(rqstp, argv, resv))
1310                return svc_send(rqstp);
1311        else {
1312                svc_drop(rqstp);
1313                return 0;
1314        }
1315}
1316
1317#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1318/*
1319 * Process a backchannel RPC request that arrived over an existing
1320 * outbound connection
1321 */
1322int
1323bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1324               struct svc_rqst *rqstp)
1325{
1326        struct kvec     *argv = &rqstp->rq_arg.head[0];
1327        struct kvec     *resv = &rqstp->rq_res.head[0];
1328
1329        /* Build the svc_rqst used by the common processing routine */
1330        rqstp->rq_xprt = serv->sv_bc_xprt;
1331        rqstp->rq_xid = req->rq_xid;
1332        rqstp->rq_prot = req->rq_xprt->prot;
1333        rqstp->rq_server = serv;
1334
1335        rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1336        memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1337        memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1338        memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1339
1340        /* reset result send buffer "put" position */
1341        resv->iov_len = 0;
1342
1343        if (rqstp->rq_prot != IPPROTO_TCP) {
1344                printk(KERN_ERR "No support for Non-TCP transports!\n");
1345                BUG();
1346        }
1347
1348        /*
1349         * Skip the next two words because they've already been
1350         * processed in the trasport
1351         */
1352        svc_getu32(argv);       /* XID */
1353        svc_getnl(argv);        /* CALLDIR */
1354
1355        /* Returns 1 for send, 0 for drop */
1356        if (svc_process_common(rqstp, argv, resv)) {
1357                memcpy(&req->rq_snd_buf, &rqstp->rq_res,
1358                                                sizeof(req->rq_snd_buf));
1359                return bc_send(req);
1360        } else {
1361                /* Nothing to do to drop request */
1362                return 0;
1363        }
1364}
1365EXPORT_SYMBOL_GPL(bc_svc_process);
1366#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1367
1368/*
1369 * Return (transport-specific) limit on the rpc payload.
1370 */
1371u32 svc_max_payload(const struct svc_rqst *rqstp)
1372{
1373        u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1374
1375        if (rqstp->rq_server->sv_max_payload < max)
1376                max = rqstp->rq_server->sv_max_payload;
1377        return max;
1378}
1379EXPORT_SYMBOL_GPL(svc_max_payload);
1380
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.