linux/net/tipc/name_table.c
<<
>>
Prefs
   1/*
   2 * net/tipc/name_table.c: TIPC name table code
   3 *
   4 * Copyright (c) 2000-2006, Ericsson AB
   5 * Copyright (c) 2004-2008, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "config.h"
  39#include "dbg.h"
  40#include "name_table.h"
  41#include "name_distr.h"
  42#include "addr.h"
  43#include "node_subscr.h"
  44#include "subscr.h"
  45#include "port.h"
  46#include "cluster.h"
  47#include "bcast.h"
  48
  49static int tipc_nametbl_size = 1024;            /* must be a power of 2 */
  50
  51/**
  52 * struct sub_seq - container for all published instances of a name sequence
  53 * @lower: name sequence lower bound
  54 * @upper: name sequence upper bound
  55 * @node_list: circular list of publications made by own node
  56 * @cluster_list: circular list of publications made by own cluster
  57 * @zone_list: circular list of publications made by own zone
  58 * @node_list_size: number of entries in "node_list"
  59 * @cluster_list_size: number of entries in "cluster_list"
  60 * @zone_list_size: number of entries in "zone_list"
  61 *
  62 * Note: The zone list always contains at least one entry, since all
  63 *       publications of the associated name sequence belong to it.
  64 *       (The cluster and node lists may be empty.)
  65 */
  66
  67struct sub_seq {
  68        u32 lower;
  69        u32 upper;
  70        struct publication *node_list;
  71        struct publication *cluster_list;
  72        struct publication *zone_list;
  73        u32 node_list_size;
  74        u32 cluster_list_size;
  75        u32 zone_list_size;
  76};
  77
  78/**
  79 * struct name_seq - container for all published instances of a name type
  80 * @type: 32 bit 'type' value for name sequence
  81 * @sseq: pointer to dynamically-sized array of sub-sequences of this 'type';
  82 *        sub-sequences are sorted in ascending order
  83 * @alloc: number of sub-sequences currently in array
  84 * @first_free: array index of first unused sub-sequence entry
  85 * @ns_list: links to adjacent name sequences in hash chain
  86 * @subscriptions: list of subscriptions for this 'type'
  87 * @lock: spinlock controlling access to publication lists of all sub-sequences
  88 */
  89
  90struct name_seq {
  91        u32 type;
  92        struct sub_seq *sseqs;
  93        u32 alloc;
  94        u32 first_free;
  95        struct hlist_node ns_list;
  96        struct list_head subscriptions;
  97        spinlock_t lock;
  98};
  99
 100/**
 101 * struct name_table - table containing all existing port name publications
 102 * @types: pointer to fixed-sized array of name sequence lists,
 103 *         accessed via hashing on 'type'; name sequence lists are *not* sorted
 104 * @local_publ_count: number of publications issued by this node
 105 */
 106
 107struct name_table {
 108        struct hlist_head *types;
 109        u32 local_publ_count;
 110};
 111
 112static struct name_table table = { NULL } ;
 113static atomic_t rsv_publ_ok = ATOMIC_INIT(0);
 114DEFINE_RWLOCK(tipc_nametbl_lock);
 115
 116
 117static int hash(int x)
 118{
 119        return(x & (tipc_nametbl_size - 1));
 120}
 121
 122/**
 123 * publ_create - create a publication structure
 124 */
 125
 126static struct publication *publ_create(u32 type, u32 lower, u32 upper,
 127                                       u32 scope, u32 node, u32 port_ref,
 128                                       u32 key)
 129{
 130        struct publication *publ = kzalloc(sizeof(*publ), GFP_ATOMIC);
 131        if (publ == NULL) {
 132                warn("Publication creation failure, no memory\n");
 133                return NULL;
 134        }
 135
 136        publ->type = type;
 137        publ->lower = lower;
 138        publ->upper = upper;
 139        publ->scope = scope;
 140        publ->node = node;
 141        publ->ref = port_ref;
 142        publ->key = key;
 143        INIT_LIST_HEAD(&publ->local_list);
 144        INIT_LIST_HEAD(&publ->pport_list);
 145        INIT_LIST_HEAD(&publ->subscr.nodesub_list);
 146        return publ;
 147}
 148
 149/**
 150 * tipc_subseq_alloc - allocate a specified number of sub-sequence structures
 151 */
 152
 153static struct sub_seq *tipc_subseq_alloc(u32 cnt)
 154{
 155        struct sub_seq *sseq = kcalloc(cnt, sizeof(struct sub_seq), GFP_ATOMIC);
 156        return sseq;
 157}
 158
 159/**
 160 * tipc_nameseq_create - create a name sequence structure for the specified 'type'
 161 *
 162 * Allocates a single sub-sequence structure and sets it to all 0's.
 163 */
 164
 165static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_head)
 166{
 167        struct name_seq *nseq = kzalloc(sizeof(*nseq), GFP_ATOMIC);
 168        struct sub_seq *sseq = tipc_subseq_alloc(1);
 169
 170        if (!nseq || !sseq) {
 171                warn("Name sequence creation failed, no memory\n");
 172                kfree(nseq);
 173                kfree(sseq);
 174                return NULL;
 175        }
 176
 177        spin_lock_init(&nseq->lock);
 178        nseq->type = type;
 179        nseq->sseqs = sseq;
 180        dbg("tipc_nameseq_create(): nseq = %p, type %u, ssseqs %p, ff: %u\n",
 181            nseq, type, nseq->sseqs, nseq->first_free);
 182        nseq->alloc = 1;
 183        INIT_HLIST_NODE(&nseq->ns_list);
 184        INIT_LIST_HEAD(&nseq->subscriptions);
 185        hlist_add_head(&nseq->ns_list, seq_head);
 186        return nseq;
 187}
 188
 189/**
 190 * nameseq_find_subseq - find sub-sequence (if any) matching a name instance
 191 *
 192 * Very time-critical, so binary searches through sub-sequence array.
 193 */
 194
 195static struct sub_seq *nameseq_find_subseq(struct name_seq *nseq,
 196                                           u32 instance)
 197{
 198        struct sub_seq *sseqs = nseq->sseqs;
 199        int low = 0;
 200        int high = nseq->first_free - 1;
 201        int mid;
 202
 203        while (low <= high) {
 204                mid = (low + high) / 2;
 205                if (instance < sseqs[mid].lower)
 206                        high = mid - 1;
 207                else if (instance > sseqs[mid].upper)
 208                        low = mid + 1;
 209                else
 210                        return &sseqs[mid];
 211        }
 212        return NULL;
 213}
 214
 215/**
 216 * nameseq_locate_subseq - determine position of name instance in sub-sequence
 217 *
 218 * Returns index in sub-sequence array of the entry that contains the specified
 219 * instance value; if no entry contains that value, returns the position
 220 * where a new entry for it would be inserted in the array.
 221 *
 222 * Note: Similar to binary search code for locating a sub-sequence.
 223 */
 224
 225static u32 nameseq_locate_subseq(struct name_seq *nseq, u32 instance)
 226{
 227        struct sub_seq *sseqs = nseq->sseqs;
 228        int low = 0;
 229        int high = nseq->first_free - 1;
 230        int mid;
 231
 232        while (low <= high) {
 233                mid = (low + high) / 2;
 234                if (instance < sseqs[mid].lower)
 235                        high = mid - 1;
 236                else if (instance > sseqs[mid].upper)
 237                        low = mid + 1;
 238                else
 239                        return mid;
 240        }
 241        return low;
 242}
 243
 244/**
 245 * tipc_nameseq_insert_publ -
 246 */
 247
 248static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
 249                                                    u32 type, u32 lower, u32 upper,
 250                                                    u32 scope, u32 node, u32 port, u32 key)
 251{
 252        struct subscription *s;
 253        struct subscription *st;
 254        struct publication *publ;
 255        struct sub_seq *sseq;
 256        int created_subseq = 0;
 257
 258        sseq = nameseq_find_subseq(nseq, lower);
 259        dbg("nameseq_ins: for seq %p, {%u,%u}, found sseq %p\n",
 260            nseq, type, lower, sseq);
 261        if (sseq) {
 262
 263                /* Lower end overlaps existing entry => need an exact match */
 264
 265                if ((sseq->lower != lower) || (sseq->upper != upper)) {
 266                        warn("Cannot publish {%u,%u,%u}, overlap error\n",
 267                             type, lower, upper);
 268                        return NULL;
 269                }
 270        } else {
 271                u32 inspos;
 272                struct sub_seq *freesseq;
 273
 274                /* Find where lower end should be inserted */
 275
 276                inspos = nameseq_locate_subseq(nseq, lower);
 277
 278                /* Fail if upper end overlaps into an existing entry */
 279
 280                if ((inspos < nseq->first_free) &&
 281                    (upper >= nseq->sseqs[inspos].lower)) {
 282                        warn("Cannot publish {%u,%u,%u}, overlap error\n",
 283                             type, lower, upper);
 284                        return NULL;
 285                }
 286
 287                /* Ensure there is space for new sub-sequence */
 288
 289                if (nseq->first_free == nseq->alloc) {
 290                        struct sub_seq *sseqs = tipc_subseq_alloc(nseq->alloc * 2);
 291
 292                        if (!sseqs) {
 293                                warn("Cannot publish {%u,%u,%u}, no memory\n",
 294                                     type, lower, upper);
 295                                return NULL;
 296                        }
 297                        dbg("Allocated %u more sseqs\n", nseq->alloc);
 298                        memcpy(sseqs, nseq->sseqs,
 299                               nseq->alloc * sizeof(struct sub_seq));
 300                        kfree(nseq->sseqs);
 301                        nseq->sseqs = sseqs;
 302                        nseq->alloc *= 2;
 303                }
 304                dbg("Have %u sseqs for type %u\n", nseq->alloc, type);
 305
 306                /* Insert new sub-sequence */
 307
 308                dbg("ins in pos %u, ff = %u\n", inspos, nseq->first_free);
 309                sseq = &nseq->sseqs[inspos];
 310                freesseq = &nseq->sseqs[nseq->first_free];
 311                memmove(sseq + 1, sseq, (freesseq - sseq) * sizeof (*sseq));
 312                memset(sseq, 0, sizeof (*sseq));
 313                nseq->first_free++;
 314                sseq->lower = lower;
 315                sseq->upper = upper;
 316                created_subseq = 1;
 317        }
 318        dbg("inserting {%u,%u,%u} from <0x%x:%u> into sseq %p(%u,%u) of seq %p\n",
 319            type, lower, upper, node, port, sseq,
 320            sseq->lower, sseq->upper, nseq);
 321
 322        /* Insert a publication: */
 323
 324        publ = publ_create(type, lower, upper, scope, node, port, key);
 325        if (!publ)
 326                return NULL;
 327        dbg("inserting publ %p, node=0x%x publ->node=0x%x, subscr->node=%p\n",
 328            publ, node, publ->node, publ->subscr.node);
 329
 330        sseq->zone_list_size++;
 331        if (!sseq->zone_list)
 332                sseq->zone_list = publ->zone_list_next = publ;
 333        else {
 334                publ->zone_list_next = sseq->zone_list->zone_list_next;
 335                sseq->zone_list->zone_list_next = publ;
 336        }
 337
 338        if (in_own_cluster(node)) {
 339                sseq->cluster_list_size++;
 340                if (!sseq->cluster_list)
 341                        sseq->cluster_list = publ->cluster_list_next = publ;
 342                else {
 343                        publ->cluster_list_next =
 344                        sseq->cluster_list->cluster_list_next;
 345                        sseq->cluster_list->cluster_list_next = publ;
 346                }
 347        }
 348
 349        if (node == tipc_own_addr) {
 350                sseq->node_list_size++;
 351                if (!sseq->node_list)
 352                        sseq->node_list = publ->node_list_next = publ;
 353                else {
 354                        publ->node_list_next = sseq->node_list->node_list_next;
 355                        sseq->node_list->node_list_next = publ;
 356                }
 357        }
 358
 359        /*
 360         * Any subscriptions waiting for notification?
 361         */
 362        list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
 363                dbg("calling report_overlap()\n");
 364                tipc_subscr_report_overlap(s,
 365                                           publ->lower,
 366                                           publ->upper,
 367                                           TIPC_PUBLISHED,
 368                                           publ->ref,
 369                                           publ->node,
 370                                           created_subseq);
 371        }
 372        return publ;
 373}
 374
 375/**
 376 * tipc_nameseq_remove_publ -
 377 *
 378 * NOTE: There may be cases where TIPC is asked to remove a publication
 379 * that is not in the name table.  For example, if another node issues a
 380 * publication for a name sequence that overlaps an existing name sequence
 381 * the publication will not be recorded, which means the publication won't
 382 * be found when the name sequence is later withdrawn by that node.
 383 * A failed withdraw request simply returns a failure indication and lets the
 384 * caller issue any error or warning messages associated with such a problem.
 385 */
 386
 387static struct publication *tipc_nameseq_remove_publ(struct name_seq *nseq, u32 inst,
 388                                                    u32 node, u32 ref, u32 key)
 389{
 390        struct publication *publ;
 391        struct publication *curr;
 392        struct publication *prev;
 393        struct sub_seq *sseq = nameseq_find_subseq(nseq, inst);
 394        struct sub_seq *free;
 395        struct subscription *s, *st;
 396        int removed_subseq = 0;
 397
 398        if (!sseq)
 399                return NULL;
 400
 401        dbg("tipc_nameseq_remove_publ: seq: %p, sseq %p, {%u,%u}, key %u\n",
 402            nseq, sseq, nseq->type, inst, key);
 403
 404        /* Remove publication from zone scope list */
 405
 406        prev = sseq->zone_list;
 407        publ = sseq->zone_list->zone_list_next;
 408        while ((publ->key != key) || (publ->ref != ref) ||
 409               (publ->node && (publ->node != node))) {
 410                prev = publ;
 411                publ = publ->zone_list_next;
 412                if (prev == sseq->zone_list) {
 413
 414                        /* Prevent endless loop if publication not found */
 415
 416                        return NULL;
 417                }
 418        }
 419        if (publ != sseq->zone_list)
 420                prev->zone_list_next = publ->zone_list_next;
 421        else if (publ->zone_list_next != publ) {
 422                prev->zone_list_next = publ->zone_list_next;
 423                sseq->zone_list = publ->zone_list_next;
 424        } else {
 425                sseq->zone_list = NULL;
 426        }
 427        sseq->zone_list_size--;
 428
 429        /* Remove publication from cluster scope list, if present */
 430
 431        if (in_own_cluster(node)) {
 432                prev = sseq->cluster_list;
 433                curr = sseq->cluster_list->cluster_list_next;
 434                while (curr != publ) {
 435                        prev = curr;
 436                        curr = curr->cluster_list_next;
 437                        if (prev == sseq->cluster_list) {
 438
 439                                /* Prevent endless loop for malformed list */
 440
 441                                err("Unable to de-list cluster publication\n"
 442                                    "{%u%u}, node=0x%x, ref=%u, key=%u)\n",
 443                                    publ->type, publ->lower, publ->node,
 444                                    publ->ref, publ->key);
 445                                goto end_cluster;
 446                        }
 447                }
 448                if (publ != sseq->cluster_list)
 449                        prev->cluster_list_next = publ->cluster_list_next;
 450                else if (publ->cluster_list_next != publ) {
 451                        prev->cluster_list_next = publ->cluster_list_next;
 452                        sseq->cluster_list = publ->cluster_list_next;
 453                } else {
 454                        sseq->cluster_list = NULL;
 455                }
 456                sseq->cluster_list_size--;
 457        }
 458end_cluster:
 459
 460        /* Remove publication from node scope list, if present */
 461
 462        if (node == tipc_own_addr) {
 463                prev = sseq->node_list;
 464                curr = sseq->node_list->node_list_next;
 465                while (curr != publ) {
 466                        prev = curr;
 467                        curr = curr->node_list_next;
 468                        if (prev == sseq->node_list) {
 469
 470                                /* Prevent endless loop for malformed list */
 471
 472                                err("Unable to de-list node publication\n"
 473                                    "{%u%u}, node=0x%x, ref=%u, key=%u)\n",
 474                                    publ->type, publ->lower, publ->node,
 475                                    publ->ref, publ->key);
 476                                goto end_node;
 477                        }
 478                }
 479                if (publ != sseq->node_list)
 480                        prev->node_list_next = publ->node_list_next;
 481                else if (publ->node_list_next != publ) {
 482                        prev->node_list_next = publ->node_list_next;
 483                        sseq->node_list = publ->node_list_next;
 484                } else {
 485                        sseq->node_list = NULL;
 486                }
 487                sseq->node_list_size--;
 488        }
 489end_node:
 490
 491        /* Contract subseq list if no more publications for that subseq */
 492
 493        if (!sseq->zone_list) {
 494                free = &nseq->sseqs[nseq->first_free--];
 495                memmove(sseq, sseq + 1, (free - (sseq + 1)) * sizeof (*sseq));
 496                removed_subseq = 1;
 497        }
 498
 499        /* Notify any waiting subscriptions */
 500
 501        list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
 502                tipc_subscr_report_overlap(s,
 503                                           publ->lower,
 504                                           publ->upper,
 505                                           TIPC_WITHDRAWN,
 506                                           publ->ref,
 507                                           publ->node,
 508                                           removed_subseq);
 509        }
 510
 511        return publ;
 512}
 513
 514/**
 515 * tipc_nameseq_subscribe: attach a subscription, and issue
 516 * the prescribed number of events if there is any sub-
 517 * sequence overlapping with the requested sequence
 518 */
 519
 520static void tipc_nameseq_subscribe(struct name_seq *nseq, struct subscription *s)
 521{
 522        struct sub_seq *sseq = nseq->sseqs;
 523
 524        list_add(&s->nameseq_list, &nseq->subscriptions);
 525
 526        if (!sseq)
 527                return;
 528
 529        while (sseq != &nseq->sseqs[nseq->first_free]) {
 530                struct publication *zl = sseq->zone_list;
 531                if (zl && tipc_subscr_overlap(s,sseq->lower,sseq->upper)) {
 532                        struct publication *crs = zl;
 533                        int must_report = 1;
 534
 535                        do {
 536                                tipc_subscr_report_overlap(s,
 537                                                           sseq->lower,
 538                                                           sseq->upper,
 539                                                           TIPC_PUBLISHED,
 540                                                           crs->ref,
 541                                                           crs->node,
 542                                                           must_report);
 543                                must_report = 0;
 544                                crs = crs->zone_list_next;
 545                        } while (crs != zl);
 546                }
 547                sseq++;
 548        }
 549}
 550
 551static struct name_seq *nametbl_find_seq(u32 type)
 552{
 553        struct hlist_head *seq_head;
 554        struct hlist_node *seq_node;
 555        struct name_seq *ns;
 556
 557        dbg("find_seq %u,(%u,0x%x) table = %p, hash[type] = %u\n",
 558            type, htonl(type), type, table.types, hash(type));
 559
 560        seq_head = &table.types[hash(type)];
 561        hlist_for_each_entry(ns, seq_node, seq_head, ns_list) {
 562                if (ns->type == type) {
 563                        dbg("found %p\n", ns);
 564                        return ns;
 565                }
 566        }
 567
 568        return NULL;
 569};
 570
 571struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
 572                                             u32 scope, u32 node, u32 port, u32 key)
 573{
 574        struct name_seq *seq = nametbl_find_seq(type);
 575
 576        dbg("tipc_nametbl_insert_publ: {%u,%u,%u} found %p\n", type, lower, upper, seq);
 577        if (lower > upper) {
 578                warn("Failed to publish illegal {%u,%u,%u}\n",
 579                     type, lower, upper);
 580                return NULL;
 581        }
 582
 583        dbg("Publishing {%u,%u,%u} from 0x%x\n", type, lower, upper, node);
 584        if (!seq) {
 585                seq = tipc_nameseq_create(type, &table.types[hash(type)]);
 586                dbg("tipc_nametbl_insert_publ: created %p\n", seq);
 587        }
 588        if (!seq)
 589                return NULL;
 590
 591        return tipc_nameseq_insert_publ(seq, type, lower, upper,
 592                                        scope, node, port, key);
 593}
 594
 595struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
 596                                             u32 node, u32 ref, u32 key)
 597{
 598        struct publication *publ;
 599        struct name_seq *seq = nametbl_find_seq(type);
 600
 601        if (!seq)
 602                return NULL;
 603
 604        dbg("Withdrawing {%u,%u} from 0x%x\n", type, lower, node);
 605        publ = tipc_nameseq_remove_publ(seq, lower, node, ref, key);
 606
 607        if (!seq->first_free && list_empty(&seq->subscriptions)) {
 608                hlist_del_init(&seq->ns_list);
 609                kfree(seq->sseqs);
 610                kfree(seq);
 611        }
 612        return publ;
 613}
 614
 615/*
 616 * tipc_nametbl_translate(): Translate tipc_name -> tipc_portid.
 617 *                      Very time-critical.
 618 *
 619 * Note: on entry 'destnode' is the search domain used during translation;
 620 *       on exit it passes back the node address of the matching port (if any)
 621 */
 622
 623u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
 624{
 625        struct sub_seq *sseq;
 626        struct publication *publ = NULL;
 627        struct name_seq *seq;
 628        u32 ref;
 629
 630        if (!tipc_in_scope(*destnode, tipc_own_addr))
 631                return 0;
 632
 633        read_lock_bh(&tipc_nametbl_lock);
 634        seq = nametbl_find_seq(type);
 635        if (unlikely(!seq))
 636                goto not_found;
 637        sseq = nameseq_find_subseq(seq, instance);
 638        if (unlikely(!sseq))
 639                goto not_found;
 640        spin_lock_bh(&seq->lock);
 641
 642        /* Closest-First Algorithm: */
 643        if (likely(!*destnode)) {
 644                publ = sseq->node_list;
 645                if (publ) {
 646                        sseq->node_list = publ->node_list_next;
 647found:
 648                        ref = publ->ref;
 649                        *destnode = publ->node;
 650                        spin_unlock_bh(&seq->lock);
 651                        read_unlock_bh(&tipc_nametbl_lock);
 652                        return ref;
 653                }
 654                publ = sseq->cluster_list;
 655                if (publ) {
 656                        sseq->cluster_list = publ->cluster_list_next;
 657                        goto found;
 658                }
 659                publ = sseq->zone_list;
 660                if (publ) {
 661                        sseq->zone_list = publ->zone_list_next;
 662                        goto found;
 663                }
 664        }
 665
 666        /* Round-Robin Algorithm: */
 667        else if (*destnode == tipc_own_addr) {
 668                publ = sseq->node_list;
 669                if (publ) {
 670                        sseq->node_list = publ->node_list_next;
 671                        goto found;
 672                }
 673        } else if (in_own_cluster(*destnode)) {
 674                publ = sseq->cluster_list;
 675                if (publ) {
 676                        sseq->cluster_list = publ->cluster_list_next;
 677                        goto found;
 678                }
 679        } else {
 680                publ = sseq->zone_list;
 681                if (publ) {
 682                        sseq->zone_list = publ->zone_list_next;
 683                        goto found;
 684                }
 685        }
 686        spin_unlock_bh(&seq->lock);
 687not_found:
 688        *destnode = 0;
 689        read_unlock_bh(&tipc_nametbl_lock);
 690        return 0;
 691}
 692
 693/**
 694 * tipc_nametbl_mc_translate - find multicast destinations
 695 *
 696 * Creates list of all local ports that overlap the given multicast address;
 697 * also determines if any off-node ports overlap.
 698 *
 699 * Note: Publications with a scope narrower than 'limit' are ignored.
 700 * (i.e. local node-scope publications mustn't receive messages arriving
 701 * from another node, even if the multcast link brought it here)
 702 *
 703 * Returns non-zero if any off-node ports overlap
 704 */
 705
 706int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
 707                              struct port_list *dports)
 708{
 709        struct name_seq *seq;
 710        struct sub_seq *sseq;
 711        struct sub_seq *sseq_stop;
 712        int res = 0;
 713
 714        read_lock_bh(&tipc_nametbl_lock);
 715        seq = nametbl_find_seq(type);
 716        if (!seq)
 717                goto exit;
 718
 719        spin_lock_bh(&seq->lock);
 720
 721        sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
 722        sseq_stop = seq->sseqs + seq->first_free;
 723        for (; sseq != sseq_stop; sseq++) {
 724                struct publication *publ;
 725
 726                if (sseq->lower > upper)
 727                        break;
 728
 729                publ = sseq->node_list;
 730                if (publ) {
 731                        do {
 732                                if (publ->scope <= limit)
 733                                        tipc_port_list_add(dports, publ->ref);
 734                                publ = publ->node_list_next;
 735                        } while (publ != sseq->node_list);
 736                }
 737
 738                if (sseq->cluster_list_size != sseq->node_list_size)
 739                        res = 1;
 740        }
 741
 742        spin_unlock_bh(&seq->lock);
 743exit:
 744        read_unlock_bh(&tipc_nametbl_lock);
 745        return res;
 746}
 747
 748/**
 749 * tipc_nametbl_publish_rsv - publish port name using a reserved name type
 750 */
 751
 752int tipc_nametbl_publish_rsv(u32 ref, unsigned int scope,
 753                        struct tipc_name_seq const *seq)
 754{
 755        int res;
 756
 757        atomic_inc(&rsv_publ_ok);
 758        res = tipc_publish(ref, scope, seq);
 759        atomic_dec(&rsv_publ_ok);
 760        return res;
 761}
 762
 763/**
 764 * tipc_nametbl_publish - add name publication to network name tables
 765 */
 766
 767struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 768                                    u32 scope, u32 port_ref, u32 key)
 769{
 770        struct publication *publ;
 771
 772        if (table.local_publ_count >= tipc_max_publications) {
 773                warn("Publication failed, local publication limit reached (%u)\n",
 774                     tipc_max_publications);
 775                return NULL;
 776        }
 777        if ((type < TIPC_RESERVED_TYPES) && !atomic_read(&rsv_publ_ok)) {
 778                warn("Publication failed, reserved name {%u,%u,%u}\n",
 779                     type, lower, upper);
 780                return NULL;
 781        }
 782
 783        write_lock_bh(&tipc_nametbl_lock);
 784        table.local_publ_count++;
 785        publ = tipc_nametbl_insert_publ(type, lower, upper, scope,
 786                                   tipc_own_addr, port_ref, key);
 787        if (publ && (scope != TIPC_NODE_SCOPE)) {
 788                tipc_named_publish(publ);
 789        }
 790        write_unlock_bh(&tipc_nametbl_lock);
 791        return publ;
 792}
 793
 794/**
 795 * tipc_nametbl_withdraw - withdraw name publication from network name tables
 796 */
 797
 798int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
 799{
 800        struct publication *publ;
 801
 802        dbg("tipc_nametbl_withdraw: {%u,%u}, key=%u\n", type, lower, key);
 803        write_lock_bh(&tipc_nametbl_lock);
 804        publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
 805        if (likely(publ)) {
 806                table.local_publ_count--;
 807                if (publ->scope != TIPC_NODE_SCOPE)
 808                        tipc_named_withdraw(publ);
 809                write_unlock_bh(&tipc_nametbl_lock);
 810                list_del_init(&publ->pport_list);
 811                kfree(publ);
 812                return 1;
 813        }
 814        write_unlock_bh(&tipc_nametbl_lock);
 815        err("Unable to remove local publication\n"
 816            "(type=%u, lower=%u, ref=%u, key=%u)\n",
 817            type, lower, ref, key);
 818        return 0;
 819}
 820
 821/**
 822 * tipc_nametbl_subscribe - add a subscription object to the name table
 823 */
 824
 825void tipc_nametbl_subscribe(struct subscription *s)
 826{
 827        u32 type = s->seq.type;
 828        struct name_seq *seq;
 829
 830        write_lock_bh(&tipc_nametbl_lock);
 831        seq = nametbl_find_seq(type);
 832        if (!seq) {
 833                seq = tipc_nameseq_create(type, &table.types[hash(type)]);
 834        }
 835        if (seq){
 836                spin_lock_bh(&seq->lock);
 837                dbg("tipc_nametbl_subscribe:found %p for {%u,%u,%u}\n",
 838                    seq, type, s->seq.lower, s->seq.upper);
 839                tipc_nameseq_subscribe(seq, s);
 840                spin_unlock_bh(&seq->lock);
 841        } else {
 842                warn("Failed to create subscription for {%u,%u,%u}\n",
 843                     s->seq.type, s->seq.lower, s->seq.upper);
 844        }
 845        write_unlock_bh(&tipc_nametbl_lock);
 846}
 847
 848/**
 849 * tipc_nametbl_unsubscribe - remove a subscription object from name table
 850 */
 851
 852void tipc_nametbl_unsubscribe(struct subscription *s)
 853{
 854        struct name_seq *seq;
 855
 856        write_lock_bh(&tipc_nametbl_lock);
 857        seq = nametbl_find_seq(s->seq.type);
 858        if (seq != NULL){
 859                spin_lock_bh(&seq->lock);
 860                list_del_init(&s->nameseq_list);
 861                spin_unlock_bh(&seq->lock);
 862                if ((seq->first_free == 0) && list_empty(&seq->subscriptions)) {
 863                        hlist_del_init(&seq->ns_list);
 864                        kfree(seq->sseqs);
 865                        kfree(seq);
 866                }
 867        }
 868        write_unlock_bh(&tipc_nametbl_lock);
 869}
 870
 871
 872/**
 873 * subseq_list: print specified sub-sequence contents into the given buffer
 874 */
 875
 876static void subseq_list(struct sub_seq *sseq, struct print_buf *buf, u32 depth,
 877                        u32 index)
 878{
 879        char portIdStr[27];
 880        char *scopeStr;
 881        struct publication *publ = sseq->zone_list;
 882
 883        tipc_printf(buf, "%-10u %-10u ", sseq->lower, sseq->upper);
 884
 885        if (depth == 2 || !publ) {
 886                tipc_printf(buf, "\n");
 887                return;
 888        }
 889
 890        do {
 891                sprintf (portIdStr, "<%u.%u.%u:%u>",
 892                         tipc_zone(publ->node), tipc_cluster(publ->node),
 893                         tipc_node(publ->node), publ->ref);
 894                tipc_printf(buf, "%-26s ", portIdStr);
 895                if (depth > 3) {
 896                        if (publ->node != tipc_own_addr)
 897                                scopeStr = "";
 898                        else if (publ->scope == TIPC_NODE_SCOPE)
 899                                scopeStr = "node";
 900                        else if (publ->scope == TIPC_CLUSTER_SCOPE)
 901                                scopeStr = "cluster";
 902                        else
 903                                scopeStr = "zone";
 904                        tipc_printf(buf, "%-10u %s", publ->key, scopeStr);
 905                }
 906
 907                publ = publ->zone_list_next;
 908                if (publ == sseq->zone_list)
 909                        break;
 910
 911                tipc_printf(buf, "\n%33s", " ");
 912        } while (1);
 913
 914        tipc_printf(buf, "\n");
 915}
 916
 917/**
 918 * nameseq_list: print specified name sequence contents into the given buffer
 919 */
 920
 921static void nameseq_list(struct name_seq *seq, struct print_buf *buf, u32 depth,
 922                         u32 type, u32 lowbound, u32 upbound, u32 index)
 923{
 924        struct sub_seq *sseq;
 925        char typearea[11];
 926
 927        if (seq->first_free == 0)
 928                return;
 929
 930        sprintf(typearea, "%-10u", seq->type);
 931
 932        if (depth == 1) {
 933                tipc_printf(buf, "%s\n", typearea);
 934                return;
 935        }
 936
 937        for (sseq = seq->sseqs; sseq != &seq->sseqs[seq->first_free]; sseq++) {
 938                if ((lowbound <= sseq->upper) && (upbound >= sseq->lower)) {
 939                        tipc_printf(buf, "%s ", typearea);
 940                        spin_lock_bh(&seq->lock);
 941                        subseq_list(sseq, buf, depth, index);
 942                        spin_unlock_bh(&seq->lock);
 943                        sprintf(typearea, "%10s", " ");
 944                }
 945        }
 946}
 947
 948/**
 949 * nametbl_header - print name table header into the given buffer
 950 */
 951
 952static void nametbl_header(struct print_buf *buf, u32 depth)
 953{
 954        tipc_printf(buf, "Type       ");
 955
 956        if (depth > 1)
 957                tipc_printf(buf, "Lower      Upper      ");
 958        if (depth > 2)
 959                tipc_printf(buf, "Port Identity              ");
 960        if (depth > 3)
 961                tipc_printf(buf, "Publication");
 962
 963        tipc_printf(buf, "\n-----------");
 964
 965        if (depth > 1)
 966                tipc_printf(buf, "--------------------- ");
 967        if (depth > 2)
 968                tipc_printf(buf, "-------------------------- ");
 969        if (depth > 3)
 970                tipc_printf(buf, "------------------");
 971
 972        tipc_printf(buf, "\n");
 973}
 974
 975/**
 976 * nametbl_list - print specified name table contents into the given buffer
 977 */
 978
 979static void nametbl_list(struct print_buf *buf, u32 depth_info,
 980                         u32 type, u32 lowbound, u32 upbound)
 981{
 982        struct hlist_head *seq_head;
 983        struct hlist_node *seq_node;
 984        struct name_seq *seq;
 985        int all_types;
 986        u32 depth;
 987        u32 i;
 988
 989        all_types = (depth_info & TIPC_NTQ_ALLTYPES);
 990        depth = (depth_info & ~TIPC_NTQ_ALLTYPES);
 991
 992        if (depth == 0)
 993                return;
 994
 995        if (all_types) {
 996                /* display all entries in name table to specified depth */
 997                nametbl_header(buf, depth);
 998                lowbound = 0;
 999                upbound = ~0;
1000                for (i = 0; i < tipc_nametbl_size; i++) {
1001                        seq_head = &table.types[i];
1002                        hlist_for_each_entry(seq, seq_node, seq_head, ns_list) {
1003                                nameseq_list(seq, buf, depth, seq->type,
1004                                             lowbound, upbound, i);
1005                        }
1006                }
1007        } else {
1008                /* display only the sequence that matches the specified type */
1009                if (upbound < lowbound) {
1010                        tipc_printf(buf, "invalid name sequence specified\n");
1011                        return;
1012                }
1013                nametbl_header(buf, depth);
1014                i = hash(type);
1015                seq_head = &table.types[i];
1016                hlist_for_each_entry(seq, seq_node, seq_head, ns_list) {
1017                        if (seq->type == type) {
1018                                nameseq_list(seq, buf, depth, type,
1019                                             lowbound, upbound, i);
1020                                break;
1021                        }
1022                }
1023        }
1024}
1025
1026#if 0
1027void tipc_nametbl_print(struct print_buf *buf, const char *str)
1028{
1029        tipc_printf(buf, str);
1030        read_lock_bh(&tipc_nametbl_lock);
1031        nametbl_list(buf, 0, 0, 0, 0);
1032        read_unlock_bh(&tipc_nametbl_lock);
1033}
1034#endif
1035
1036#define MAX_NAME_TBL_QUERY 32768
1037
1038struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space)
1039{
1040        struct sk_buff *buf;
1041        struct tipc_name_table_query *argv;
1042        struct tlv_desc *rep_tlv;
1043        struct print_buf b;
1044        int str_len;
1045
1046        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_NAME_TBL_QUERY))
1047                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
1048
1049        buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_NAME_TBL_QUERY));
1050        if (!buf)
1051                return NULL;
1052
1053        rep_tlv = (struct tlv_desc *)buf->data;
1054        tipc_printbuf_init(&b, TLV_DATA(rep_tlv), MAX_NAME_TBL_QUERY);
1055        argv = (struct tipc_name_table_query *)TLV_DATA(req_tlv_area);
1056        read_lock_bh(&tipc_nametbl_lock);
1057        nametbl_list(&b, ntohl(argv->depth), ntohl(argv->type),
1058                     ntohl(argv->lowbound), ntohl(argv->upbound));
1059        read_unlock_bh(&tipc_nametbl_lock);
1060        str_len = tipc_printbuf_validate(&b);
1061
1062        skb_put(buf, TLV_SPACE(str_len));
1063        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
1064
1065        return buf;
1066}
1067
1068#if 0
1069void tipc_nametbl_dump(void)
1070{
1071        nametbl_list(TIPC_CONS, 0, 0, 0, 0);
1072}
1073#endif
1074
1075int tipc_nametbl_init(void)
1076{
1077        table.types = kcalloc(tipc_nametbl_size, sizeof(struct hlist_head),
1078                              GFP_ATOMIC);
1079        if (!table.types)
1080                return -ENOMEM;
1081
1082        table.local_publ_count = 0;
1083        return 0;
1084}
1085
1086void tipc_nametbl_stop(void)
1087{
1088        u32 i;
1089
1090        if (!table.types)
1091                return;
1092
1093        /* Verify name table is empty, then release it */
1094
1095        write_lock_bh(&tipc_nametbl_lock);
1096        for (i = 0; i < tipc_nametbl_size; i++) {
1097                if (!hlist_empty(&table.types[i]))
1098                        err("tipc_nametbl_stop(): hash chain %u is non-null\n", i);
1099        }
1100        kfree(table.types);
1101        table.types = NULL;
1102        write_unlock_bh(&tipc_nametbl_lock);
1103}
1104
1105