linux/net/tipc/link.c
<<
>>
Prefs
   1/*
   2 * net/tipc/link.c: TIPC link code
   3 *
   4 * Copyright (c) 1996-2007, Ericsson AB
   5 * Copyright (c) 2004-2007, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "dbg.h"
  39#include "link.h"
  40#include "net.h"
  41#include "node.h"
  42#include "port.h"
  43#include "addr.h"
  44#include "node_subscr.h"
  45#include "name_distr.h"
  46#include "bearer.h"
  47#include "name_table.h"
  48#include "discover.h"
  49#include "config.h"
  50#include "bcast.h"
  51
  52
  53/*
  54 * Out-of-range value for link session numbers
  55 */
  56
  57#define INVALID_SESSION 0x10000
  58
  59/*
  60 * Limit for deferred reception queue:
  61 */
  62
  63#define DEF_QUEUE_LIMIT 256u
  64
  65/*
  66 * Link state events:
  67 */
  68
  69#define  STARTING_EVT    856384768      /* link processing trigger */
  70#define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
  71#define  TIMEOUT_EVT     560817u        /* link timer expired */
  72
  73/*
  74 * The following two 'message types' is really just implementation
  75 * data conveniently stored in the message header.
  76 * They must not be considered part of the protocol
  77 */
  78#define OPEN_MSG   0
  79#define CLOSED_MSG 1
  80
  81/*
  82 * State value stored in 'exp_msg_count'
  83 */
  84
  85#define START_CHANGEOVER 100000u
  86
  87/**
  88 * struct link_name - deconstructed link name
  89 * @addr_local: network address of node at this end
  90 * @if_local: name of interface at this end
  91 * @addr_peer: network address of node at far end
  92 * @if_peer: name of interface at far end
  93 */
  94
  95struct link_name {
  96        u32 addr_local;
  97        char if_local[TIPC_MAX_IF_NAME];
  98        u32 addr_peer;
  99        char if_peer[TIPC_MAX_IF_NAME];
 100};
 101
 102#if 0
 103
 104/* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
 105
 106/**
 107 * struct link_event - link up/down event notification
 108 */
 109
 110struct link_event {
 111        u32 addr;
 112        int up;
 113        void (*fcn)(u32, char *, int);
 114        char name[TIPC_MAX_LINK_NAME];
 115};
 116
 117#endif
 118
 119static void link_handle_out_of_seq_msg(struct link *l_ptr,
 120                                       struct sk_buff *buf);
 121static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
 122static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
 123static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
 124static int  link_send_sections_long(struct port *sender,
 125                                    struct iovec const *msg_sect,
 126                                    u32 num_sect, u32 destnode);
 127static void link_check_defragm_bufs(struct link *l_ptr);
 128static void link_state_event(struct link *l_ptr, u32 event);
 129static void link_reset_statistics(struct link *l_ptr);
 130static void link_print(struct link *l_ptr, struct print_buf *buf,
 131                       const char *str);
 132
 133/*
 134 * Debugging code used by link routines only
 135 *
 136 * When debugging link problems on a system that has multiple links,
 137 * the standard TIPC debugging routines may not be useful since they
 138 * allow the output from multiple links to be intermixed.  For this reason
 139 * routines of the form "dbg_link_XXX()" have been created that will capture
 140 * debug info into a link's personal print buffer, which can then be dumped
 141 * into the TIPC system log (TIPC_LOG) upon request.
 142 *
 143 * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
 144 * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
 145 * the dbg_link_XXX() routines simply send their output to the standard
 146 * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
 147 * when there is only a single link in the system being debugged.
 148 *
 149 * Notes:
 150 * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE
 151 * - "l_ptr" must be valid when using dbg_link_XXX() macros
 152 */
 153
 154#define LINK_LOG_BUF_SIZE 0
 155
 156#define dbg_link(fmt, arg...) \
 157        do { \
 158                if (LINK_LOG_BUF_SIZE) \
 159                        tipc_printf(&l_ptr->print_buf, fmt, ## arg); \
 160        } while (0)
 161#define dbg_link_msg(msg, txt) \
 162        do { \
 163                if (LINK_LOG_BUF_SIZE) \
 164                        tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \
 165        } while (0)
 166#define dbg_link_state(txt) \
 167        do { \
 168                if (LINK_LOG_BUF_SIZE) \
 169                        link_print(l_ptr, &l_ptr->print_buf, txt); \
 170        } while (0)
 171#define dbg_link_dump() do { \
 172        if (LINK_LOG_BUF_SIZE) { \
 173                tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \
 174                tipc_printbuf_move(LOG, &l_ptr->print_buf); \
 175        } \
 176} while (0)
 177
 178static void dbg_print_link(struct link *l_ptr, const char *str)
 179{
 180        if (DBG_OUTPUT != TIPC_NULL)
 181                link_print(l_ptr, DBG_OUTPUT, str);
 182}
 183
 184static void dbg_print_buf_chain(struct sk_buff *root_buf)
 185{
 186        if (DBG_OUTPUT != TIPC_NULL) {
 187                struct sk_buff *buf = root_buf;
 188
 189                while (buf) {
 190                        msg_dbg(buf_msg(buf), "In chain: ");
 191                        buf = buf->next;
 192                }
 193        }
 194}
 195
 196/*
 197 *  Simple link routines
 198 */
 199
 200static unsigned int align(unsigned int i)
 201{
 202        return (i + 3) & ~3u;
 203}
 204
 205static void link_init_max_pkt(struct link *l_ptr)
 206{
 207        u32 max_pkt;
 208
 209        max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
 210        if (max_pkt > MAX_MSG_SIZE)
 211                max_pkt = MAX_MSG_SIZE;
 212
 213        l_ptr->max_pkt_target = max_pkt;
 214        if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
 215                l_ptr->max_pkt = l_ptr->max_pkt_target;
 216        else
 217                l_ptr->max_pkt = MAX_PKT_DEFAULT;
 218
 219        l_ptr->max_pkt_probes = 0;
 220}
 221
 222static u32 link_next_sent(struct link *l_ptr)
 223{
 224        if (l_ptr->next_out)
 225                return msg_seqno(buf_msg(l_ptr->next_out));
 226        return mod(l_ptr->next_out_no);
 227}
 228
 229static u32 link_last_sent(struct link *l_ptr)
 230{
 231        return mod(link_next_sent(l_ptr) - 1);
 232}
 233
 234/*
 235 *  Simple non-static link routines (i.e. referenced outside this file)
 236 */
 237
 238int tipc_link_is_up(struct link *l_ptr)
 239{
 240        if (!l_ptr)
 241                return 0;
 242        return (link_working_working(l_ptr) || link_working_unknown(l_ptr));
 243}
 244
 245int tipc_link_is_active(struct link *l_ptr)
 246{
 247        return ((l_ptr->owner->active_links[0] == l_ptr) ||
 248                (l_ptr->owner->active_links[1] == l_ptr));
 249}
 250
 251/**
 252 * link_name_validate - validate & (optionally) deconstruct link name
 253 * @name - ptr to link name string
 254 * @name_parts - ptr to area for link name components (or NULL if not needed)
 255 *
 256 * Returns 1 if link name is valid, otherwise 0.
 257 */
 258
 259static int link_name_validate(const char *name, struct link_name *name_parts)
 260{
 261        char name_copy[TIPC_MAX_LINK_NAME];
 262        char *addr_local;
 263        char *if_local;
 264        char *addr_peer;
 265        char *if_peer;
 266        char dummy;
 267        u32 z_local, c_local, n_local;
 268        u32 z_peer, c_peer, n_peer;
 269        u32 if_local_len;
 270        u32 if_peer_len;
 271
 272        /* copy link name & ensure length is OK */
 273
 274        name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
 275        /* need above in case non-Posix strncpy() doesn't pad with nulls */
 276        strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
 277        if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
 278                return 0;
 279
 280        /* ensure all component parts of link name are present */
 281
 282        addr_local = name_copy;
 283        if ((if_local = strchr(addr_local, ':')) == NULL)
 284                return 0;
 285        *(if_local++) = 0;
 286        if ((addr_peer = strchr(if_local, '-')) == NULL)
 287                return 0;
 288        *(addr_peer++) = 0;
 289        if_local_len = addr_peer - if_local;
 290        if ((if_peer = strchr(addr_peer, ':')) == NULL)
 291                return 0;
 292        *(if_peer++) = 0;
 293        if_peer_len = strlen(if_peer) + 1;
 294
 295        /* validate component parts of link name */
 296
 297        if ((sscanf(addr_local, "%u.%u.%u%c",
 298                    &z_local, &c_local, &n_local, &dummy) != 3) ||
 299            (sscanf(addr_peer, "%u.%u.%u%c",
 300                    &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
 301            (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
 302            (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
 303            (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
 304            (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
 305            (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
 306            (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
 307                return 0;
 308
 309        /* return link name components, if necessary */
 310
 311        if (name_parts) {
 312                name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
 313                strcpy(name_parts->if_local, if_local);
 314                name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
 315                strcpy(name_parts->if_peer, if_peer);
 316        }
 317        return 1;
 318}
 319
 320/**
 321 * link_timeout - handle expiration of link timer
 322 * @l_ptr: pointer to link
 323 *
 324 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
 325 * with tipc_link_delete().  (There is no risk that the node will be deleted by
 326 * another thread because tipc_link_delete() always cancels the link timer before
 327 * tipc_node_delete() is called.)
 328 */
 329
 330static void link_timeout(struct link *l_ptr)
 331{
 332        tipc_node_lock(l_ptr->owner);
 333
 334        /* update counters used in statistical profiling of send traffic */
 335
 336        l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
 337        l_ptr->stats.queue_sz_counts++;
 338
 339        if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
 340                l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
 341
 342        if (l_ptr->first_out) {
 343                struct tipc_msg *msg = buf_msg(l_ptr->first_out);
 344                u32 length = msg_size(msg);
 345
 346                if ((msg_user(msg) == MSG_FRAGMENTER) &&
 347                    (msg_type(msg) == FIRST_FRAGMENT)) {
 348                        length = msg_size(msg_get_wrapped(msg));
 349                }
 350                if (length) {
 351                        l_ptr->stats.msg_lengths_total += length;
 352                        l_ptr->stats.msg_length_counts++;
 353                        if (length <= 64)
 354                                l_ptr->stats.msg_length_profile[0]++;
 355                        else if (length <= 256)
 356                                l_ptr->stats.msg_length_profile[1]++;
 357                        else if (length <= 1024)
 358                                l_ptr->stats.msg_length_profile[2]++;
 359                        else if (length <= 4096)
 360                                l_ptr->stats.msg_length_profile[3]++;
 361                        else if (length <= 16384)
 362                                l_ptr->stats.msg_length_profile[4]++;
 363                        else if (length <= 32768)
 364                                l_ptr->stats.msg_length_profile[5]++;
 365                        else
 366                                l_ptr->stats.msg_length_profile[6]++;
 367                }
 368        }
 369
 370        /* do all other link processing performed on a periodic basis */
 371
 372        link_check_defragm_bufs(l_ptr);
 373
 374        link_state_event(l_ptr, TIMEOUT_EVT);
 375
 376        if (l_ptr->next_out)
 377                tipc_link_push_queue(l_ptr);
 378
 379        tipc_node_unlock(l_ptr->owner);
 380}
 381
 382static void link_set_timer(struct link *l_ptr, u32 time)
 383{
 384        k_start_timer(&l_ptr->timer, time);
 385}
 386
 387/**
 388 * tipc_link_create - create a new link
 389 * @b_ptr: pointer to associated bearer
 390 * @peer: network address of node at other end of link
 391 * @media_addr: media address to use when sending messages over link
 392 *
 393 * Returns pointer to link.
 394 */
 395
 396struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
 397                              const struct tipc_media_addr *media_addr)
 398{
 399        struct link *l_ptr;
 400        struct tipc_msg *msg;
 401        char *if_name;
 402
 403        l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
 404        if (!l_ptr) {
 405                warn("Link creation failed, no memory\n");
 406                return NULL;
 407        }
 408
 409        if (LINK_LOG_BUF_SIZE) {
 410                char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC);
 411
 412                if (!pb) {
 413                        kfree(l_ptr);
 414                        warn("Link creation failed, no memory for print buffer\n");
 415                        return NULL;
 416                }
 417                tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
 418        }
 419
 420        l_ptr->addr = peer;
 421        if_name = strchr(b_ptr->publ.name, ':') + 1;
 422        sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
 423                tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
 424                tipc_node(tipc_own_addr),
 425                if_name,
 426                tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
 427                /* note: peer i/f is appended to link name by reset/activate */
 428        memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
 429        l_ptr->checkpoint = 1;
 430        l_ptr->b_ptr = b_ptr;
 431        link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
 432        l_ptr->state = RESET_UNKNOWN;
 433
 434        l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
 435        msg = l_ptr->pmsg;
 436        tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
 437        msg_set_size(msg, sizeof(l_ptr->proto_msg));
 438        msg_set_session(msg, (tipc_random & 0xffff));
 439        msg_set_bearer_id(msg, b_ptr->identity);
 440        strcpy((char *)msg_data(msg), if_name);
 441
 442        l_ptr->priority = b_ptr->priority;
 443        tipc_link_set_queue_limits(l_ptr, b_ptr->media->window);
 444
 445        link_init_max_pkt(l_ptr);
 446
 447        l_ptr->next_out_no = 1;
 448        INIT_LIST_HEAD(&l_ptr->waiting_ports);
 449
 450        link_reset_statistics(l_ptr);
 451
 452        l_ptr->owner = tipc_node_attach_link(l_ptr);
 453        if (!l_ptr->owner) {
 454                if (LINK_LOG_BUF_SIZE)
 455                        kfree(l_ptr->print_buf.buf);
 456                kfree(l_ptr);
 457                return NULL;
 458        }
 459
 460        k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
 461        list_add_tail(&l_ptr->link_list, &b_ptr->links);
 462        tipc_k_signal((Handler)tipc_link_start, (unsigned long)l_ptr);
 463
 464        dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n",
 465            l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit);
 466
 467        return l_ptr;
 468}
 469
 470/**
 471 * tipc_link_delete - delete a link
 472 * @l_ptr: pointer to link
 473 *
 474 * Note: 'tipc_net_lock' is write_locked, bearer is locked.
 475 * This routine must not grab the node lock until after link timer cancellation
 476 * to avoid a potential deadlock situation.
 477 */
 478
 479void tipc_link_delete(struct link *l_ptr)
 480{
 481        if (!l_ptr) {
 482                err("Attempt to delete non-existent link\n");
 483                return;
 484        }
 485
 486        dbg("tipc_link_delete()\n");
 487
 488        k_cancel_timer(&l_ptr->timer);
 489
 490        tipc_node_lock(l_ptr->owner);
 491        tipc_link_reset(l_ptr);
 492        tipc_node_detach_link(l_ptr->owner, l_ptr);
 493        tipc_link_stop(l_ptr);
 494        list_del_init(&l_ptr->link_list);
 495        if (LINK_LOG_BUF_SIZE)
 496                kfree(l_ptr->print_buf.buf);
 497        tipc_node_unlock(l_ptr->owner);
 498        k_term_timer(&l_ptr->timer);
 499        kfree(l_ptr);
 500}
 501
 502void tipc_link_start(struct link *l_ptr)
 503{
 504        dbg("tipc_link_start %x\n", l_ptr);
 505        link_state_event(l_ptr, STARTING_EVT);
 506}
 507
 508/**
 509 * link_schedule_port - schedule port for deferred sending
 510 * @l_ptr: pointer to link
 511 * @origport: reference to sending port
 512 * @sz: amount of data to be sent
 513 *
 514 * Schedules port for renewed sending of messages after link congestion
 515 * has abated.
 516 */
 517
 518static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
 519{
 520        struct port *p_ptr;
 521
 522        spin_lock_bh(&tipc_port_list_lock);
 523        p_ptr = tipc_port_lock(origport);
 524        if (p_ptr) {
 525                if (!p_ptr->wakeup)
 526                        goto exit;
 527                if (!list_empty(&p_ptr->wait_list))
 528                        goto exit;
 529                p_ptr->publ.congested = 1;
 530                p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
 531                list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
 532                l_ptr->stats.link_congs++;
 533exit:
 534                tipc_port_unlock(p_ptr);
 535        }
 536        spin_unlock_bh(&tipc_port_list_lock);
 537        return -ELINKCONG;
 538}
 539
 540void tipc_link_wakeup_ports(struct link *l_ptr, int all)
 541{
 542        struct port *p_ptr;
 543        struct port *temp_p_ptr;
 544        int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
 545
 546        if (all)
 547                win = 100000;
 548        if (win <= 0)
 549                return;
 550        if (!spin_trylock_bh(&tipc_port_list_lock))
 551                return;
 552        if (link_congested(l_ptr))
 553                goto exit;
 554        list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
 555                                 wait_list) {
 556                if (win <= 0)
 557                        break;
 558                list_del_init(&p_ptr->wait_list);
 559                spin_lock_bh(p_ptr->publ.lock);
 560                p_ptr->publ.congested = 0;
 561                p_ptr->wakeup(&p_ptr->publ);
 562                win -= p_ptr->waiting_pkts;
 563                spin_unlock_bh(p_ptr->publ.lock);
 564        }
 565
 566exit:
 567        spin_unlock_bh(&tipc_port_list_lock);
 568}
 569
 570/**
 571 * link_release_outqueue - purge link's outbound message queue
 572 * @l_ptr: pointer to link
 573 */
 574
 575static void link_release_outqueue(struct link *l_ptr)
 576{
 577        struct sk_buff *buf = l_ptr->first_out;
 578        struct sk_buff *next;
 579
 580        while (buf) {
 581                next = buf->next;
 582                buf_discard(buf);
 583                buf = next;
 584        }
 585        l_ptr->first_out = NULL;
 586        l_ptr->out_queue_size = 0;
 587}
 588
 589/**
 590 * tipc_link_reset_fragments - purge link's inbound message fragments queue
 591 * @l_ptr: pointer to link
 592 */
 593
 594void tipc_link_reset_fragments(struct link *l_ptr)
 595{
 596        struct sk_buff *buf = l_ptr->defragm_buf;
 597        struct sk_buff *next;
 598
 599        while (buf) {
 600                next = buf->next;
 601                buf_discard(buf);
 602                buf = next;
 603        }
 604        l_ptr->defragm_buf = NULL;
 605}
 606
 607/**
 608 * tipc_link_stop - purge all inbound and outbound messages associated with link
 609 * @l_ptr: pointer to link
 610 */
 611
 612void tipc_link_stop(struct link *l_ptr)
 613{
 614        struct sk_buff *buf;
 615        struct sk_buff *next;
 616
 617        buf = l_ptr->oldest_deferred_in;
 618        while (buf) {
 619                next = buf->next;
 620                buf_discard(buf);
 621                buf = next;
 622        }
 623
 624        buf = l_ptr->first_out;
 625        while (buf) {
 626                next = buf->next;
 627                buf_discard(buf);
 628                buf = next;
 629        }
 630
 631        tipc_link_reset_fragments(l_ptr);
 632
 633        buf_discard(l_ptr->proto_msg_queue);
 634        l_ptr->proto_msg_queue = NULL;
 635}
 636
 637#if 0
 638
 639/* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
 640
 641static void link_recv_event(struct link_event *ev)
 642{
 643        ev->fcn(ev->addr, ev->name, ev->up);
 644        kfree(ev);
 645}
 646
 647static void link_send_event(void (*fcn)(u32 a, char *n, int up),
 648                            struct link *l_ptr, int up)
 649{
 650        struct link_event *ev;
 651
 652        ev = kmalloc(sizeof(*ev), GFP_ATOMIC);
 653        if (!ev) {
 654                warn("Link event allocation failure\n");
 655                return;
 656        }
 657        ev->addr = l_ptr->addr;
 658        ev->up = up;
 659        ev->fcn = fcn;
 660        memcpy(ev->name, l_ptr->name, TIPC_MAX_LINK_NAME);
 661        tipc_k_signal((Handler)link_recv_event, (unsigned long)ev);
 662}
 663
 664#else
 665
 666#define link_send_event(fcn, l_ptr, up) do { } while (0)
 667
 668#endif
 669
 670void tipc_link_reset(struct link *l_ptr)
 671{
 672        struct sk_buff *buf;
 673        u32 prev_state = l_ptr->state;
 674        u32 checkpoint = l_ptr->next_in_no;
 675        int was_active_link = tipc_link_is_active(l_ptr);
 676
 677        msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
 678
 679        /* Link is down, accept any session */
 680        l_ptr->peer_session = INVALID_SESSION;
 681
 682        /* Prepare for max packet size negotiation */
 683        link_init_max_pkt(l_ptr);
 684
 685        l_ptr->state = RESET_UNKNOWN;
 686        dbg_link_state("Resetting Link\n");
 687
 688        if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
 689                return;
 690
 691        tipc_node_link_down(l_ptr->owner, l_ptr);
 692        tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
 693#if 0
 694        tipc_printf(TIPC_CONS, "\nReset link <%s>\n", l_ptr->name);
 695        dbg_link_dump();
 696#endif
 697        if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
 698            l_ptr->owner->permit_changeover) {
 699                l_ptr->reset_checkpoint = checkpoint;
 700                l_ptr->exp_msg_count = START_CHANGEOVER;
 701        }
 702
 703        /* Clean up all queues: */
 704
 705        link_release_outqueue(l_ptr);
 706        buf_discard(l_ptr->proto_msg_queue);
 707        l_ptr->proto_msg_queue = NULL;
 708        buf = l_ptr->oldest_deferred_in;
 709        while (buf) {
 710                struct sk_buff *next = buf->next;
 711                buf_discard(buf);
 712                buf = next;
 713        }
 714        if (!list_empty(&l_ptr->waiting_ports))
 715                tipc_link_wakeup_ports(l_ptr, 1);
 716
 717        l_ptr->retransm_queue_head = 0;
 718        l_ptr->retransm_queue_size = 0;
 719        l_ptr->last_out = NULL;
 720        l_ptr->first_out = NULL;
 721        l_ptr->next_out = NULL;
 722        l_ptr->unacked_window = 0;
 723        l_ptr->checkpoint = 1;
 724        l_ptr->next_out_no = 1;
 725        l_ptr->deferred_inqueue_sz = 0;
 726        l_ptr->oldest_deferred_in = NULL;
 727        l_ptr->newest_deferred_in = NULL;
 728        l_ptr->fsm_msg_cnt = 0;
 729        l_ptr->stale_count = 0;
 730        link_reset_statistics(l_ptr);
 731
 732        link_send_event(tipc_cfg_link_event, l_ptr, 0);
 733        if (!in_own_cluster(l_ptr->addr))
 734                link_send_event(tipc_disc_link_event, l_ptr, 0);
 735}
 736
 737
 738static void link_activate(struct link *l_ptr)
 739{
 740        l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
 741        tipc_node_link_up(l_ptr->owner, l_ptr);
 742        tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
 743        link_send_event(tipc_cfg_link_event, l_ptr, 1);
 744        if (!in_own_cluster(l_ptr->addr))
 745                link_send_event(tipc_disc_link_event, l_ptr, 1);
 746}
 747
 748/**
 749 * link_state_event - link finite state machine
 750 * @l_ptr: pointer to link
 751 * @event: state machine event to process
 752 */
 753
 754static void link_state_event(struct link *l_ptr, unsigned event)
 755{
 756        struct link *other;
 757        u32 cont_intv = l_ptr->continuity_interval;
 758
 759        if (!l_ptr->started && (event != STARTING_EVT))
 760                return;         /* Not yet. */
 761
 762        if (link_blocked(l_ptr)) {
 763                if (event == TIMEOUT_EVT) {
 764                        link_set_timer(l_ptr, cont_intv);
 765                }
 766                return;   /* Changeover going on */
 767        }
 768        dbg_link("STATE_EV: <%s> ", l_ptr->name);
 769
 770        switch (l_ptr->state) {
 771        case WORKING_WORKING:
 772                dbg_link("WW/");
 773                switch (event) {
 774                case TRAFFIC_MSG_EVT:
 775                        dbg_link("TRF-");
 776                        /* fall through */
 777                case ACTIVATE_MSG:
 778                        dbg_link("ACT\n");
 779                        break;
 780                case TIMEOUT_EVT:
 781                        dbg_link("TIM ");
 782                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
 783                                l_ptr->checkpoint = l_ptr->next_in_no;
 784                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
 785                                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 786                                                                 0, 0, 0, 0, 0);
 787                                        l_ptr->fsm_msg_cnt++;
 788                                } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
 789                                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 790                                                                 1, 0, 0, 0, 0);
 791                                        l_ptr->fsm_msg_cnt++;
 792                                }
 793                                link_set_timer(l_ptr, cont_intv);
 794                                break;
 795                        }
 796                        dbg_link(" -> WU\n");
 797                        l_ptr->state = WORKING_UNKNOWN;
 798                        l_ptr->fsm_msg_cnt = 0;
 799                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 800                        l_ptr->fsm_msg_cnt++;
 801                        link_set_timer(l_ptr, cont_intv / 4);
 802                        break;
 803                case RESET_MSG:
 804                        dbg_link("RES -> RR\n");
 805                        info("Resetting link <%s>, requested by peer\n",
 806                             l_ptr->name);
 807                        tipc_link_reset(l_ptr);
 808                        l_ptr->state = RESET_RESET;
 809                        l_ptr->fsm_msg_cnt = 0;
 810                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
 811                        l_ptr->fsm_msg_cnt++;
 812                        link_set_timer(l_ptr, cont_intv);
 813                        break;
 814                default:
 815                        err("Unknown link event %u in WW state\n", event);
 816                }
 817                break;
 818        case WORKING_UNKNOWN:
 819                dbg_link("WU/");
 820                switch (event) {
 821                case TRAFFIC_MSG_EVT:
 822                        dbg_link("TRF-");
 823                case ACTIVATE_MSG:
 824                        dbg_link("ACT -> WW\n");
 825                        l_ptr->state = WORKING_WORKING;
 826                        l_ptr->fsm_msg_cnt = 0;
 827                        link_set_timer(l_ptr, cont_intv);
 828                        break;
 829                case RESET_MSG:
 830                        dbg_link("RES -> RR\n");
 831                        info("Resetting link <%s>, requested by peer "
 832                             "while probing\n", l_ptr->name);
 833                        tipc_link_reset(l_ptr);
 834                        l_ptr->state = RESET_RESET;
 835                        l_ptr->fsm_msg_cnt = 0;
 836                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
 837                        l_ptr->fsm_msg_cnt++;
 838                        link_set_timer(l_ptr, cont_intv);
 839                        break;
 840                case TIMEOUT_EVT:
 841                        dbg_link("TIM ");
 842                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
 843                                dbg_link("-> WW\n");
 844                                l_ptr->state = WORKING_WORKING;
 845                                l_ptr->fsm_msg_cnt = 0;
 846                                l_ptr->checkpoint = l_ptr->next_in_no;
 847                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
 848                                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 849                                                                 0, 0, 0, 0, 0);
 850                                        l_ptr->fsm_msg_cnt++;
 851                                }
 852                                link_set_timer(l_ptr, cont_intv);
 853                        } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
 854                                dbg_link("Probing %u/%u,timer = %u ms)\n",
 855                                         l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
 856                                         cont_intv / 4);
 857                                tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 858                                                         1, 0, 0, 0, 0);
 859                                l_ptr->fsm_msg_cnt++;
 860                                link_set_timer(l_ptr, cont_intv / 4);
 861                        } else {        /* Link has failed */
 862                                dbg_link("-> RU (%u probes unanswered)\n",
 863                                         l_ptr->fsm_msg_cnt);
 864                                warn("Resetting link <%s>, peer not responding\n",
 865                                     l_ptr->name);
 866                                tipc_link_reset(l_ptr);
 867                                l_ptr->state = RESET_UNKNOWN;
 868                                l_ptr->fsm_msg_cnt = 0;
 869                                tipc_link_send_proto_msg(l_ptr, RESET_MSG,
 870                                                         0, 0, 0, 0, 0);
 871                                l_ptr->fsm_msg_cnt++;
 872                                link_set_timer(l_ptr, cont_intv);
 873                        }
 874                        break;
 875                default:
 876                        err("Unknown link event %u in WU state\n", event);
 877                }
 878                break;
 879        case RESET_UNKNOWN:
 880                dbg_link("RU/");
 881                switch (event) {
 882                case TRAFFIC_MSG_EVT:
 883                        dbg_link("TRF-\n");
 884                        break;
 885                case ACTIVATE_MSG:
 886                        other = l_ptr->owner->active_links[0];
 887                        if (other && link_working_unknown(other)) {
 888                                dbg_link("ACT\n");
 889                                break;
 890                        }
 891                        dbg_link("ACT -> WW\n");
 892                        l_ptr->state = WORKING_WORKING;
 893                        l_ptr->fsm_msg_cnt = 0;
 894                        link_activate(l_ptr);
 895                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 896                        l_ptr->fsm_msg_cnt++;
 897                        link_set_timer(l_ptr, cont_intv);
 898                        break;
 899                case RESET_MSG:
 900                        dbg_link("RES\n");
 901                        dbg_link(" -> RR\n");
 902                        l_ptr->state = RESET_RESET;
 903                        l_ptr->fsm_msg_cnt = 0;
 904                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
 905                        l_ptr->fsm_msg_cnt++;
 906                        link_set_timer(l_ptr, cont_intv);
 907                        break;
 908                case STARTING_EVT:
 909                        dbg_link("START-");
 910                        l_ptr->started = 1;
 911                        /* fall through */
 912                case TIMEOUT_EVT:
 913                        dbg_link("TIM\n");
 914                        tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
 915                        l_ptr->fsm_msg_cnt++;
 916                        link_set_timer(l_ptr, cont_intv);
 917                        break;
 918                default:
 919                        err("Unknown link event %u in RU state\n", event);
 920                }
 921                break;
 922        case RESET_RESET:
 923                dbg_link("RR/ ");
 924                switch (event) {
 925                case TRAFFIC_MSG_EVT:
 926                        dbg_link("TRF-");
 927                        /* fall through */
 928                case ACTIVATE_MSG:
 929                        other = l_ptr->owner->active_links[0];
 930                        if (other && link_working_unknown(other)) {
 931                                dbg_link("ACT\n");
 932                                break;
 933                        }
 934                        dbg_link("ACT -> WW\n");
 935                        l_ptr->state = WORKING_WORKING;
 936                        l_ptr->fsm_msg_cnt = 0;
 937                        link_activate(l_ptr);
 938                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 939                        l_ptr->fsm_msg_cnt++;
 940                        link_set_timer(l_ptr, cont_intv);
 941                        break;
 942                case RESET_MSG:
 943                        dbg_link("RES\n");
 944                        break;
 945                case TIMEOUT_EVT:
 946                        dbg_link("TIM\n");
 947                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
 948                        l_ptr->fsm_msg_cnt++;
 949                        link_set_timer(l_ptr, cont_intv);
 950                        dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt);
 951                        break;
 952                default:
 953                        err("Unknown link event %u in RR state\n", event);
 954                }
 955                break;
 956        default:
 957                err("Unknown link state %u/%u\n", l_ptr->state, event);
 958        }
 959}
 960
 961/*
 962 * link_bundle_buf(): Append contents of a buffer to
 963 * the tail of an existing one.
 964 */
 965
 966static int link_bundle_buf(struct link *l_ptr,
 967                           struct sk_buff *bundler,
 968                           struct sk_buff *buf)
 969{
 970        struct tipc_msg *bundler_msg = buf_msg(bundler);
 971        struct tipc_msg *msg = buf_msg(buf);
 972        u32 size = msg_size(msg);
 973        u32 bundle_size = msg_size(bundler_msg);
 974        u32 to_pos = align(bundle_size);
 975        u32 pad = to_pos - bundle_size;
 976
 977        if (msg_user(bundler_msg) != MSG_BUNDLER)
 978                return 0;
 979        if (msg_type(bundler_msg) != OPEN_MSG)
 980                return 0;
 981        if (skb_tailroom(bundler) < (pad + size))
 982                return 0;
 983        if (l_ptr->max_pkt < (to_pos + size))
 984                return 0;
 985
 986        skb_put(bundler, pad + size);
 987        skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
 988        msg_set_size(bundler_msg, to_pos + size);
 989        msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
 990        dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n",
 991            msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg));
 992        msg_dbg(msg, "PACKD:");
 993        buf_discard(buf);
 994        l_ptr->stats.sent_bundled++;
 995        return 1;
 996}
 997
 998static void link_add_to_outqueue(struct link *l_ptr,
 999                                 struct sk_buff *buf,
1000                                 struct tipc_msg *msg)
1001{
1002        u32 ack = mod(l_ptr->next_in_no - 1);
1003        u32 seqno = mod(l_ptr->next_out_no++);
1004
1005        msg_set_word(msg, 2, ((ack << 16) | seqno));
1006        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1007        buf->next = NULL;
1008        if (l_ptr->first_out) {
1009                l_ptr->last_out->next = buf;
1010                l_ptr->last_out = buf;
1011        } else
1012                l_ptr->first_out = l_ptr->last_out = buf;
1013        l_ptr->out_queue_size++;
1014}
1015
1016/*
1017 * tipc_link_send_buf() is the 'full path' for messages, called from
1018 * inside TIPC when the 'fast path' in tipc_send_buf
1019 * has failed, and from link_send()
1020 */
1021
1022int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
1023{
1024        struct tipc_msg *msg = buf_msg(buf);
1025        u32 size = msg_size(msg);
1026        u32 dsz = msg_data_sz(msg);
1027        u32 queue_size = l_ptr->out_queue_size;
1028        u32 imp = tipc_msg_tot_importance(msg);
1029        u32 queue_limit = l_ptr->queue_limit[imp];
1030        u32 max_packet = l_ptr->max_pkt;
1031
1032        msg_set_prevnode(msg, tipc_own_addr);   /* If routed message */
1033
1034        /* Match msg importance against queue limits: */
1035
1036        if (unlikely(queue_size >= queue_limit)) {
1037                if (imp <= TIPC_CRITICAL_IMPORTANCE) {
1038                        return link_schedule_port(l_ptr, msg_origport(msg),
1039                                                  size);
1040                }
1041                msg_dbg(msg, "TIPC: Congestion, throwing away\n");
1042                buf_discard(buf);
1043                if (imp > CONN_MANAGER) {
1044                        warn("Resetting link <%s>, send queue full", l_ptr->name);
1045                        tipc_link_reset(l_ptr);
1046                }
1047                return dsz;
1048        }
1049
1050        /* Fragmentation needed ? */
1051
1052        if (size > max_packet)
1053                return tipc_link_send_long_buf(l_ptr, buf);
1054
1055        /* Packet can be queued or sent: */
1056
1057        if (queue_size > l_ptr->stats.max_queue_sz)
1058                l_ptr->stats.max_queue_sz = queue_size;
1059
1060        if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
1061                   !link_congested(l_ptr))) {
1062                link_add_to_outqueue(l_ptr, buf, msg);
1063
1064                if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
1065                        l_ptr->unacked_window = 0;
1066                } else {
1067                        tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1068                        l_ptr->stats.bearer_congs++;
1069                        l_ptr->next_out = buf;
1070                }
1071                return dsz;
1072        }
1073        /* Congestion: can message be bundled ?: */
1074
1075        if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
1076            (msg_user(msg) != MSG_FRAGMENTER)) {
1077
1078                /* Try adding message to an existing bundle */
1079
1080                if (l_ptr->next_out &&
1081                    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
1082                        tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1083                        return dsz;
1084                }
1085
1086                /* Try creating a new bundle */
1087
1088                if (size <= max_packet * 2 / 3) {
1089                        struct sk_buff *bundler = buf_acquire(max_packet);
1090                        struct tipc_msg bundler_hdr;
1091
1092                        if (bundler) {
1093                                tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
1094                                         INT_H_SIZE, l_ptr->addr);
1095                                skb_copy_to_linear_data(bundler, &bundler_hdr,
1096                                                        INT_H_SIZE);
1097                                skb_trim(bundler, INT_H_SIZE);
1098                                link_bundle_buf(l_ptr, bundler, buf);
1099                                buf = bundler;
1100                                msg = buf_msg(buf);
1101                                l_ptr->stats.sent_bundles++;
1102                        }
1103                }
1104        }
1105        if (!l_ptr->next_out)
1106                l_ptr->next_out = buf;
1107        link_add_to_outqueue(l_ptr, buf, msg);
1108        tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1109        return dsz;
1110}
1111
1112/*
1113 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
1114 * not been selected yet, and the the owner node is not locked
1115 * Called by TIPC internal users, e.g. the name distributor
1116 */
1117
1118int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
1119{
1120        struct link *l_ptr;
1121        struct tipc_node *n_ptr;
1122        int res = -ELINKCONG;
1123
1124        read_lock_bh(&tipc_net_lock);
1125        n_ptr = tipc_node_select(dest, selector);
1126        if (n_ptr) {
1127                tipc_node_lock(n_ptr);
1128                l_ptr = n_ptr->active_links[selector & 1];
1129                if (l_ptr) {
1130                        dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest);
1131                        res = tipc_link_send_buf(l_ptr, buf);
1132                } else {
1133                        dbg("Attempt to send msg to unreachable node:\n");
1134                        msg_dbg(buf_msg(buf),">>>");
1135                        buf_discard(buf);
1136                }
1137                tipc_node_unlock(n_ptr);
1138        } else {
1139                dbg("Attempt to send msg to unknown node:\n");
1140                msg_dbg(buf_msg(buf),">>>");
1141                buf_discard(buf);
1142        }
1143        read_unlock_bh(&tipc_net_lock);
1144        return res;
1145}
1146
1147/*
1148 * link_send_buf_fast: Entry for data messages where the
1149 * destination link is known and the header is complete,
1150 * inclusive total message length. Very time critical.
1151 * Link is locked. Returns user data length.
1152 */
1153
1154static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
1155                              u32 *used_max_pkt)
1156{
1157        struct tipc_msg *msg = buf_msg(buf);
1158        int res = msg_data_sz(msg);
1159
1160        if (likely(!link_congested(l_ptr))) {
1161                if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1162                        if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1163                                link_add_to_outqueue(l_ptr, buf, msg);
1164                                if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
1165                                                            &l_ptr->media_addr))) {
1166                                        l_ptr->unacked_window = 0;
1167                                        msg_dbg(msg,"SENT_FAST:");
1168                                        return res;
1169                                }
1170                                dbg("failed sent fast...\n");
1171                                tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1172                                l_ptr->stats.bearer_congs++;
1173                                l_ptr->next_out = buf;
1174                                return res;
1175                        }
1176                }
1177                else
1178                        *used_max_pkt = l_ptr->max_pkt;
1179        }
1180        return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1181}
1182
1183/*
1184 * tipc_send_buf_fast: Entry for data messages where the
1185 * destination node is known and the header is complete,
1186 * inclusive total message length.
1187 * Returns user data length.
1188 */
1189int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1190{
1191        struct link *l_ptr;
1192        struct tipc_node *n_ptr;
1193        int res;
1194        u32 selector = msg_origport(buf_msg(buf)) & 1;
1195        u32 dummy;
1196
1197        if (destnode == tipc_own_addr)
1198                return tipc_port_recv_msg(buf);
1199
1200        read_lock_bh(&tipc_net_lock);
1201        n_ptr = tipc_node_select(destnode, selector);
1202        if (likely(n_ptr)) {
1203                tipc_node_lock(n_ptr);
1204                l_ptr = n_ptr->active_links[selector];
1205                dbg("send_fast: buf %x selected %x, destnode = %x\n",
1206                    buf, l_ptr, destnode);
1207                if (likely(l_ptr)) {
1208                        res = link_send_buf_fast(l_ptr, buf, &dummy);
1209                        tipc_node_unlock(n_ptr);
1210                        read_unlock_bh(&tipc_net_lock);
1211                        return res;
1212                }
1213                tipc_node_unlock(n_ptr);
1214        }
1215        read_unlock_bh(&tipc_net_lock);
1216        res = msg_data_sz(buf_msg(buf));
1217        tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1218        return res;
1219}
1220
1221
1222/*
1223 * tipc_link_send_sections_fast: Entry for messages where the
1224 * destination processor is known and the header is complete,
1225 * except for total message length.
1226 * Returns user data length or errno.
1227 */
1228int tipc_link_send_sections_fast(struct port *sender,
1229                                 struct iovec const *msg_sect,
1230                                 const u32 num_sect,
1231                                 u32 destaddr)
1232{
1233        struct tipc_msg *hdr = &sender->publ.phdr;
1234        struct link *l_ptr;
1235        struct sk_buff *buf;
1236        struct tipc_node *node;
1237        int res;
1238        u32 selector = msg_origport(hdr) & 1;
1239
1240again:
1241        /*
1242         * Try building message using port's max_pkt hint.
1243         * (Must not hold any locks while building message.)
1244         */
1245
1246        res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt,
1247                        !sender->user_port, &buf);
1248
1249        read_lock_bh(&tipc_net_lock);
1250        node = tipc_node_select(destaddr, selector);
1251        if (likely(node)) {
1252                tipc_node_lock(node);
1253                l_ptr = node->active_links[selector];
1254                if (likely(l_ptr)) {
1255                        if (likely(buf)) {
1256                                res = link_send_buf_fast(l_ptr, buf,
1257                                                         &sender->publ.max_pkt);
1258                                if (unlikely(res < 0))
1259                                        buf_discard(buf);
1260exit:
1261                                tipc_node_unlock(node);
1262                                read_unlock_bh(&tipc_net_lock);
1263                                return res;
1264                        }
1265
1266                        /* Exit if build request was invalid */
1267
1268                        if (unlikely(res < 0))
1269                                goto exit;
1270
1271                        /* Exit if link (or bearer) is congested */
1272
1273                        if (link_congested(l_ptr) ||
1274                            !list_empty(&l_ptr->b_ptr->cong_links)) {
1275                                res = link_schedule_port(l_ptr,
1276                                                         sender->publ.ref, res);
1277                                goto exit;
1278                        }
1279
1280                        /*
1281                         * Message size exceeds max_pkt hint; update hint,
1282                         * then re-try fast path or fragment the message
1283                         */
1284
1285                        sender->publ.max_pkt = l_ptr->max_pkt;
1286                        tipc_node_unlock(node);
1287                        read_unlock_bh(&tipc_net_lock);
1288
1289
1290                        if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt)
1291                                goto again;
1292
1293                        return link_send_sections_long(sender, msg_sect,
1294                                                       num_sect, destaddr);
1295                }
1296                tipc_node_unlock(node);
1297        }
1298        read_unlock_bh(&tipc_net_lock);
1299
1300        /* Couldn't find a link to the destination node */
1301
1302        if (buf)
1303                return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1304        if (res >= 0)
1305                return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1306                                                 TIPC_ERR_NO_NODE);
1307        return res;
1308}
1309
1310/*
1311 * link_send_sections_long(): Entry for long messages where the
1312 * destination node is known and the header is complete,
1313 * inclusive total message length.
1314 * Link and bearer congestion status have been checked to be ok,
1315 * and are ignored if they change.
1316 *
1317 * Note that fragments do not use the full link MTU so that they won't have
1318 * to undergo refragmentation if link changeover causes them to be sent
1319 * over another link with an additional tunnel header added as prefix.
1320 * (Refragmentation will still occur if the other link has a smaller MTU.)
1321 *
1322 * Returns user data length or errno.
1323 */
1324static int link_send_sections_long(struct port *sender,
1325                                   struct iovec const *msg_sect,
1326                                   u32 num_sect,
1327                                   u32 destaddr)
1328{
1329        struct link *l_ptr;
1330        struct tipc_node *node;
1331        struct tipc_msg *hdr = &sender->publ.phdr;
1332        u32 dsz = msg_data_sz(hdr);
1333        u32 max_pkt,fragm_sz,rest;
1334        struct tipc_msg fragm_hdr;
1335        struct sk_buff *buf,*buf_chain,*prev;
1336        u32 fragm_crs,fragm_rest,hsz,sect_rest;
1337        const unchar *sect_crs;
1338        int curr_sect;
1339        u32 fragm_no;
1340
1341again:
1342        fragm_no = 1;
1343        max_pkt = sender->publ.max_pkt - INT_H_SIZE;
1344                /* leave room for tunnel header in case of link changeover */
1345        fragm_sz = max_pkt - INT_H_SIZE;
1346                /* leave room for fragmentation header in each fragment */
1347        rest = dsz;
1348        fragm_crs = 0;
1349        fragm_rest = 0;
1350        sect_rest = 0;
1351        sect_crs = NULL;
1352        curr_sect = -1;
1353
1354        /* Prepare reusable fragment header: */
1355
1356        msg_dbg(hdr, ">FRAGMENTING>");
1357        tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1358                 INT_H_SIZE, msg_destnode(hdr));
1359        msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1360        msg_set_size(&fragm_hdr, max_pkt);
1361        msg_set_fragm_no(&fragm_hdr, 1);
1362
1363        /* Prepare header of first fragment: */
1364
1365        buf_chain = buf = buf_acquire(max_pkt);
1366        if (!buf)
1367                return -ENOMEM;
1368        buf->next = NULL;
1369        skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1370        hsz = msg_hdr_sz(hdr);
1371        skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1372        msg_dbg(buf_msg(buf), ">BUILD>");
1373
1374        /* Chop up message: */
1375
1376        fragm_crs = INT_H_SIZE + hsz;
1377        fragm_rest = fragm_sz - hsz;
1378
1379        do {            /* For all sections */
1380                u32 sz;
1381
1382                if (!sect_rest) {
1383                        sect_rest = msg_sect[++curr_sect].iov_len;
1384                        sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1385                }
1386
1387                if (sect_rest < fragm_rest)
1388                        sz = sect_rest;
1389                else
1390                        sz = fragm_rest;
1391
1392                if (likely(!sender->user_port)) {
1393                        if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1394error:
1395                                for (; buf_chain; buf_chain = buf) {
1396                                        buf = buf_chain->next;
1397                                        buf_discard(buf_chain);
1398                                }
1399                                return -EFAULT;
1400                        }
1401                } else
1402                        skb_copy_to_linear_data_offset(buf, fragm_crs,
1403                                                       sect_crs, sz);
1404                sect_crs += sz;
1405                sect_rest -= sz;
1406                fragm_crs += sz;
1407                fragm_rest -= sz;
1408                rest -= sz;
1409
1410                if (!fragm_rest && rest) {
1411
1412                        /* Initiate new fragment: */
1413                        if (rest <= fragm_sz) {
1414                                fragm_sz = rest;
1415                                msg_set_type(&fragm_hdr,LAST_FRAGMENT);
1416                        } else {
1417                                msg_set_type(&fragm_hdr, FRAGMENT);
1418                        }
1419                        msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1420                        msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1421                        prev = buf;
1422                        buf = buf_acquire(fragm_sz + INT_H_SIZE);
1423                        if (!buf)
1424                                goto error;
1425
1426                        buf->next = NULL;
1427                        prev->next = buf;
1428                        skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1429                        fragm_crs = INT_H_SIZE;
1430                        fragm_rest = fragm_sz;
1431                        msg_dbg(buf_msg(buf),"  >BUILD>");
1432                }
1433        }
1434        while (rest > 0);
1435
1436        /*
1437         * Now we have a buffer chain. Select a link and check
1438         * that packet size is still OK
1439         */
1440        node = tipc_node_select(destaddr, sender->publ.ref & 1);
1441        if (likely(node)) {
1442                tipc_node_lock(node);
1443                l_ptr = node->active_links[sender->publ.ref & 1];
1444                if (!l_ptr) {
1445                        tipc_node_unlock(node);
1446                        goto reject;
1447                }
1448                if (l_ptr->max_pkt < max_pkt) {
1449                        sender->publ.max_pkt = l_ptr->max_pkt;
1450                        tipc_node_unlock(node);
1451                        for (; buf_chain; buf_chain = buf) {
1452                                buf = buf_chain->next;
1453                                buf_discard(buf_chain);
1454                        }
1455                        goto again;
1456                }
1457        } else {
1458reject:
1459                for (; buf_chain; buf_chain = buf) {
1460                        buf = buf_chain->next;
1461                        buf_discard(buf_chain);
1462                }
1463                return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1464                                                 TIPC_ERR_NO_NODE);
1465        }
1466
1467        /* Append whole chain to send queue: */
1468
1469        buf = buf_chain;
1470        l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1471        if (!l_ptr->next_out)
1472                l_ptr->next_out = buf_chain;
1473        l_ptr->stats.sent_fragmented++;
1474        while (buf) {
1475                struct sk_buff *next = buf->next;
1476                struct tipc_msg *msg = buf_msg(buf);
1477
1478                l_ptr->stats.sent_fragments++;
1479                msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1480                link_add_to_outqueue(l_ptr, buf, msg);
1481                msg_dbg(msg, ">ADD>");
1482                buf = next;
1483        }
1484
1485        /* Send it, if possible: */
1486
1487        tipc_link_push_queue(l_ptr);
1488        tipc_node_unlock(node);
1489        return dsz;
1490}
1491
1492/*
1493 * tipc_link_push_packet: Push one unsent packet to the media
1494 */
1495u32 tipc_link_push_packet(struct link *l_ptr)
1496{
1497        struct sk_buff *buf = l_ptr->first_out;
1498        u32 r_q_size = l_ptr->retransm_queue_size;
1499        u32 r_q_head = l_ptr->retransm_queue_head;
1500
1501        /* Step to position where retransmission failed, if any,    */
1502        /* consider that buffers may have been released in meantime */
1503
1504        if (r_q_size && buf) {
1505                u32 last = lesser(mod(r_q_head + r_q_size),
1506                                  link_last_sent(l_ptr));
1507                u32 first = msg_seqno(buf_msg(buf));
1508
1509                while (buf && less(first, r_q_head)) {
1510                        first = mod(first + 1);
1511                        buf = buf->next;
1512                }
1513                l_ptr->retransm_queue_head = r_q_head = first;
1514                l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1515        }
1516
1517        /* Continue retransmission now, if there is anything: */
1518
1519        if (r_q_size && buf) {
1520                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1521                msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1522                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1523                        msg_dbg(buf_msg(buf), ">DEF-RETR>");
1524                        l_ptr->retransm_queue_head = mod(++r_q_head);
1525                        l_ptr->retransm_queue_size = --r_q_size;
1526                        l_ptr->stats.retransmitted++;
1527                        return 0;
1528                } else {
1529                        l_ptr->stats.bearer_congs++;
1530                        msg_dbg(buf_msg(buf), "|>DEF-RETR>");
1531                        return PUSH_FAILED;
1532                }
1533        }
1534
1535        /* Send deferred protocol message, if any: */
1536
1537        buf = l_ptr->proto_msg_queue;
1538        if (buf) {
1539                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1540                msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);
1541                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1542                        msg_dbg(buf_msg(buf), ">DEF-PROT>");
1543                        l_ptr->unacked_window = 0;
1544                        buf_discard(buf);
1545                        l_ptr->proto_msg_queue = NULL;
1546                        return 0;
1547                } else {
1548                        msg_dbg(buf_msg(buf), "|>DEF-PROT>");
1549                        l_ptr->stats.bearer_congs++;
1550                        return PUSH_FAILED;
1551                }
1552        }
1553
1554        /* Send one deferred data message, if send window not full: */
1555
1556        buf = l_ptr->next_out;
1557        if (buf) {
1558                struct tipc_msg *msg = buf_msg(buf);
1559                u32 next = msg_seqno(msg);
1560                u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1561
1562                if (mod(next - first) < l_ptr->queue_limit[0]) {
1563                        msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1564                        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1565                        if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1566                                if (msg_user(msg) == MSG_BUNDLER)
1567                                        msg_set_type(msg, CLOSED_MSG);
1568                                msg_dbg(msg, ">PUSH-DATA>");
1569                                l_ptr->next_out = buf->next;
1570                                return 0;
1571                        } else {
1572                                msg_dbg(msg, "|PUSH-DATA|");
1573                                l_ptr->stats.bearer_congs++;
1574                                return PUSH_FAILED;
1575                        }
1576                }
1577        }
1578        return PUSH_FINISHED;
1579}
1580
1581/*
1582 * push_queue(): push out the unsent messages of a link where
1583 *               congestion has abated. Node is locked
1584 */
1585void tipc_link_push_queue(struct link *l_ptr)
1586{
1587        u32 res;
1588
1589        if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1590                return;
1591
1592        do {
1593                res = tipc_link_push_packet(l_ptr);
1594        } while (!res);
1595
1596        if (res == PUSH_FAILED)
1597                tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1598}
1599
1600static void link_reset_all(unsigned long addr)
1601{
1602        struct tipc_node *n_ptr;
1603        char addr_string[16];
1604        u32 i;
1605
1606        read_lock_bh(&tipc_net_lock);
1607        n_ptr = tipc_node_find((u32)addr);
1608        if (!n_ptr) {
1609                read_unlock_bh(&tipc_net_lock);
1610                return; /* node no longer exists */
1611        }
1612
1613        tipc_node_lock(n_ptr);
1614
1615        warn("Resetting all links to %s\n",
1616             tipc_addr_string_fill(addr_string, n_ptr->addr));
1617
1618        for (i = 0; i < MAX_BEARERS; i++) {
1619                if (n_ptr->links[i]) {
1620                        link_print(n_ptr->links[i], TIPC_OUTPUT,
1621                                   "Resetting link\n");
1622                        tipc_link_reset(n_ptr->links[i]);
1623                }
1624        }
1625
1626        tipc_node_unlock(n_ptr);
1627        read_unlock_bh(&tipc_net_lock);
1628}
1629
1630static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1631{
1632        struct tipc_msg *msg = buf_msg(buf);
1633
1634        warn("Retransmission failure on link <%s>\n", l_ptr->name);
1635        tipc_msg_dbg(TIPC_OUTPUT, msg, ">RETR-FAIL>");
1636
1637        if (l_ptr->addr) {
1638
1639                /* Handle failure on standard link */
1640
1641                link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");
1642                tipc_link_reset(l_ptr);
1643
1644        } else {
1645
1646                /* Handle failure on broadcast link */
1647
1648                struct tipc_node *n_ptr;
1649                char addr_string[16];
1650
1651                tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));
1652                tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",
1653                                     (unsigned long) TIPC_SKB_CB(buf)->handle);
1654
1655                n_ptr = l_ptr->owner->next;
1656                tipc_node_lock(n_ptr);
1657
1658                tipc_addr_string_fill(addr_string, n_ptr->addr);
1659                tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);
1660                tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);
1661                tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);
1662                tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);
1663                tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);
1664                tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);
1665                tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1666
1667                tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1668
1669                tipc_node_unlock(n_ptr);
1670
1671                l_ptr->stale_count = 0;
1672        }
1673}
1674
1675void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
1676                          u32 retransmits)
1677{
1678        struct tipc_msg *msg;
1679
1680        if (!buf)
1681                return;
1682
1683        msg = buf_msg(buf);
1684
1685        dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);
1686
1687        if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1688                if (l_ptr->retransm_queue_size == 0) {
1689                        msg_dbg(msg, ">NO_RETR->BCONG>");
1690                        dbg_print_link(l_ptr, "   ");
1691                        l_ptr->retransm_queue_head = msg_seqno(msg);
1692                        l_ptr->retransm_queue_size = retransmits;
1693                } else {
1694                        err("Unexpected retransmit on link %s (qsize=%d)\n",
1695                            l_ptr->name, l_ptr->retransm_queue_size);
1696                }
1697                return;
1698        } else {
1699                /* Detect repeated retransmit failures on uncongested bearer */
1700
1701                if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1702                        if (++l_ptr->stale_count > 100) {
1703                                link_retransmit_failure(l_ptr, buf);
1704                                return;
1705                        }
1706                } else {
1707                        l_ptr->last_retransmitted = msg_seqno(msg);
1708                        l_ptr->stale_count = 1;
1709                }
1710        }
1711
1712        while (retransmits && (buf != l_ptr->next_out) && buf) {
1713                msg = buf_msg(buf);
1714                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1715                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1716                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1717                        msg_dbg(buf_msg(buf), ">RETR>");
1718                        buf = buf->next;
1719                        retransmits--;
1720                        l_ptr->stats.retransmitted++;
1721                } else {
1722                        tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1723                        l_ptr->stats.bearer_congs++;
1724                        l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1725                        l_ptr->retransm_queue_size = retransmits;
1726                        return;
1727                }
1728        }
1729
1730        l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1731}
1732
1733/**
1734 * link_insert_deferred_queue - insert deferred messages back into receive chain
1735 */
1736
1737static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
1738                                                  struct sk_buff *buf)
1739{
1740        u32 seq_no;
1741
1742        if (l_ptr->oldest_deferred_in == NULL)
1743                return buf;
1744
1745        seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1746        if (seq_no == mod(l_ptr->next_in_no)) {
1747                l_ptr->newest_deferred_in->next = buf;
1748                buf = l_ptr->oldest_deferred_in;
1749                l_ptr->oldest_deferred_in = NULL;
1750                l_ptr->deferred_inqueue_sz = 0;
1751        }
1752        return buf;
1753}
1754
1755/**
1756 * link_recv_buf_validate - validate basic format of received message
1757 *
1758 * This routine ensures a TIPC message has an acceptable header, and at least
1759 * as much data as the header indicates it should.  The routine also ensures
1760 * that the entire message header is stored in the main fragment of the message
1761 * buffer, to simplify future access to message header fields.
1762 *
1763 * Note: Having extra info present in the message header or data areas is OK.
1764 * TIPC will ignore the excess, under the assumption that it is optional info
1765 * introduced by a later release of the protocol.
1766 */
1767
1768static int link_recv_buf_validate(struct sk_buff *buf)
1769{
1770        static u32 min_data_hdr_size[8] = {
1771                SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE,
1772                MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1773                };
1774
1775        struct tipc_msg *msg;
1776        u32 tipc_hdr[2];
1777        u32 size;
1778        u32 hdr_size;
1779        u32 min_hdr_size;
1780
1781        if (unlikely(buf->len < MIN_H_SIZE))
1782                return 0;
1783
1784        msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1785        if (msg == NULL)
1786                return 0;
1787
1788        if (unlikely(msg_version(msg) != TIPC_VERSION))
1789                return 0;
1790
1791        size = msg_size(msg);
1792        hdr_size = msg_hdr_sz(msg);
1793        min_hdr_size = msg_isdata(msg) ?
1794                min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1795
1796        if (unlikely((hdr_size < min_hdr_size) ||
1797                     (size < hdr_size) ||
1798                     (buf->len < size) ||
1799                     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1800                return 0;
1801
1802        return pskb_may_pull(buf, hdr_size);
1803}
1804
1805void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1806{
1807        read_lock_bh(&tipc_net_lock);
1808        while (head) {
1809                struct bearer *b_ptr = (struct bearer *)tb_ptr;
1810                struct tipc_node *n_ptr;
1811                struct link *l_ptr;
1812                struct sk_buff *crs;
1813                struct sk_buff *buf = head;
1814                struct tipc_msg *msg;
1815                u32 seq_no;
1816                u32 ackd;
1817                u32 released = 0;
1818                int type;
1819
1820                head = head->next;
1821
1822                /* Ensure message is well-formed */
1823
1824                if (unlikely(!link_recv_buf_validate(buf)))
1825                        goto cont;
1826
1827                /* Ensure message data is a single contiguous unit */
1828
1829                if (unlikely(buf_linearize(buf))) {
1830                        goto cont;
1831                }
1832
1833                /* Handle arrival of a non-unicast link message */
1834
1835                msg = buf_msg(buf);
1836
1837                if (unlikely(msg_non_seq(msg))) {
1838                        if (msg_user(msg) ==  LINK_CONFIG)
1839                                tipc_disc_recv_msg(buf, b_ptr);
1840                        else
1841                                tipc_bclink_recv_pkt(buf);
1842                        continue;
1843                }
1844
1845                if (unlikely(!msg_short(msg) &&
1846                             (msg_destnode(msg) != tipc_own_addr)))
1847                        goto cont;
1848
1849                /* Discard non-routeable messages destined for another node */
1850
1851                if (unlikely(!msg_isdata(msg) &&
1852                             (msg_destnode(msg) != tipc_own_addr))) {
1853                        if ((msg_user(msg) != CONN_MANAGER) &&
1854                            (msg_user(msg) != MSG_FRAGMENTER))
1855                                goto cont;
1856                }
1857
1858                /* Locate unicast link endpoint that should handle message */
1859
1860                n_ptr = tipc_node_find(msg_prevnode(msg));
1861                if (unlikely(!n_ptr))
1862                        goto cont;
1863                tipc_node_lock(n_ptr);
1864
1865                l_ptr = n_ptr->links[b_ptr->identity];
1866                if (unlikely(!l_ptr)) {
1867                        tipc_node_unlock(n_ptr);
1868                        goto cont;
1869                }
1870
1871                /* Validate message sequence number info */
1872
1873                seq_no = msg_seqno(msg);
1874                ackd = msg_ack(msg);
1875
1876                /* Release acked messages */
1877
1878                if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1879                        if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1880                                tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1881                }
1882
1883                crs = l_ptr->first_out;
1884                while ((crs != l_ptr->next_out) &&
1885                       less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1886                        struct sk_buff *next = crs->next;
1887
1888                        buf_discard(crs);
1889                        crs = next;
1890                        released++;
1891                }
1892                if (released) {
1893                        l_ptr->first_out = crs;
1894                        l_ptr->out_queue_size -= released;
1895                }
1896
1897                /* Try sending any messages link endpoint has pending */
1898
1899                if (unlikely(l_ptr->next_out))
1900                        tipc_link_push_queue(l_ptr);
1901                if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1902                        tipc_link_wakeup_ports(l_ptr, 0);
1903                if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1904                        l_ptr->stats.sent_acks++;
1905                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1906                }
1907
1908                /* Now (finally!) process the incoming message */
1909
1910protocol_check:
1911                if (likely(link_working_working(l_ptr))) {
1912                        if (likely(seq_no == mod(l_ptr->next_in_no))) {
1913                                l_ptr->next_in_no++;
1914                                if (unlikely(l_ptr->oldest_deferred_in))
1915                                        head = link_insert_deferred_queue(l_ptr,
1916                                                                          head);
1917                                if (likely(msg_is_dest(msg, tipc_own_addr))) {
1918deliver:
1919                                        if (likely(msg_isdata(msg))) {
1920                                                tipc_node_unlock(n_ptr);
1921                                                tipc_port_recv_msg(buf);
1922                                                continue;
1923                                        }
1924                                        switch (msg_user(msg)) {
1925                                        case MSG_BUNDLER:
1926                                                l_ptr->stats.recv_bundles++;
1927                                                l_ptr->stats.recv_bundled +=
1928                                                        msg_msgcnt(msg);
1929                                                tipc_node_unlock(n_ptr);
1930                                                tipc_link_recv_bundle(buf);
1931                                                continue;
1932                                        case ROUTE_DISTRIBUTOR:
1933                                                tipc_node_unlock(n_ptr);
1934                                                tipc_cltr_recv_routing_table(buf);
1935                                                continue;
1936                                        case NAME_DISTRIBUTOR:
1937                                                tipc_node_unlock(n_ptr);
1938                                                tipc_named_recv(buf);
1939                                                continue;
1940                                        case CONN_MANAGER:
1941                                                tipc_node_unlock(n_ptr);
1942                                                tipc_port_recv_proto_msg(buf);
1943                                                continue;
1944                                        case MSG_FRAGMENTER:
1945                                                l_ptr->stats.recv_fragments++;
1946                                                if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1947                                                                            &buf, &msg)) {
1948                                                        l_ptr->stats.recv_fragmented++;
1949                                                        goto deliver;
1950                                                }
1951                                                break;
1952                                        case CHANGEOVER_PROTOCOL:
1953                                                type = msg_type(msg);
1954                                                if (link_recv_changeover_msg(&l_ptr, &buf)) {
1955                                                        msg = buf_msg(buf);
1956                                                        seq_no = msg_seqno(msg);
1957                                                        if (type == ORIGINAL_MSG)
1958                                                                goto deliver;
1959                                                        goto protocol_check;
1960                                                }
1961                                                break;
1962                                        }
1963                                }
1964                                tipc_node_unlock(n_ptr);
1965                                tipc_net_route_msg(buf);
1966                                continue;
1967                        }
1968                        link_handle_out_of_seq_msg(l_ptr, buf);
1969                        head = link_insert_deferred_queue(l_ptr, head);
1970                        tipc_node_unlock(n_ptr);
1971                        continue;
1972                }
1973
1974                if (msg_user(msg) == LINK_PROTOCOL) {
1975                        link_recv_proto_msg(l_ptr, buf);
1976                        head = link_insert_deferred_queue(l_ptr, head);
1977                        tipc_node_unlock(n_ptr);
1978                        continue;
1979                }
1980                msg_dbg(msg,"NSEQ<REC<");
1981                link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1982
1983                if (link_working_working(l_ptr)) {
1984                        /* Re-insert in front of queue */
1985                        msg_dbg(msg,"RECV-REINS:");
1986                        buf->next = head;
1987                        head = buf;
1988                        tipc_node_unlock(n_ptr);
1989                        continue;
1990                }
1991                tipc_node_unlock(n_ptr);
1992cont:
1993                buf_discard(buf);
1994        }
1995        read_unlock_bh(&tipc_net_lock);
1996}
1997
1998/*
1999 * link_defer_buf(): Sort a received out-of-sequence packet
2000 *                   into the deferred reception queue.
2001 * Returns the increase of the queue length,i.e. 0 or 1
2002 */
2003
2004u32 tipc_link_defer_pkt(struct sk_buff **head,
2005                        struct sk_buff **tail,
2006                        struct sk_buff *buf)
2007{
2008        struct sk_buff *prev = NULL;
2009        struct sk_buff *crs = *head;
2010        u32 seq_no = msg_seqno(buf_msg(buf));
2011
2012        buf->next = NULL;
2013
2014        /* Empty queue ? */
2015        if (*head == NULL) {
2016                *head = *tail = buf;
2017                return 1;
2018        }
2019
2020        /* Last ? */
2021        if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
2022                (*tail)->next = buf;
2023                *tail = buf;
2024                return 1;
2025        }
2026
2027        /* Scan through queue and sort it in */
2028        do {
2029                struct tipc_msg *msg = buf_msg(crs);
2030
2031                if (less(seq_no, msg_seqno(msg))) {
2032                        buf->next = crs;
2033                        if (prev)
2034                                prev->next = buf;
2035                        else
2036                                *head = buf;
2037                        return 1;
2038                }
2039                if (seq_no == msg_seqno(msg)) {
2040                        break;
2041                }
2042                prev = crs;
2043                crs = crs->next;
2044        }
2045        while (crs);
2046
2047        /* Message is a duplicate of an existing message */
2048
2049        buf_discard(buf);
2050        return 0;
2051}
2052
2053/**
2054 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
2055 */
2056
2057static void link_handle_out_of_seq_msg(struct link *l_ptr,
2058                                       struct sk_buff *buf)
2059{
2060        u32 seq_no = msg_seqno(buf_msg(buf));
2061
2062        if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
2063                link_recv_proto_msg(l_ptr, buf);
2064                return;
2065        }
2066
2067        dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n",
2068            seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no);
2069
2070        /* Record OOS packet arrival (force mismatch on next timeout) */
2071
2072        l_ptr->checkpoint--;
2073
2074        /*
2075         * Discard packet if a duplicate; otherwise add it to deferred queue
2076         * and notify peer of gap as per protocol specification
2077         */
2078
2079        if (less(seq_no, mod(l_ptr->next_in_no))) {
2080                l_ptr->stats.duplicates++;
2081                buf_discard(buf);
2082                return;
2083        }
2084
2085        if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
2086                                &l_ptr->newest_deferred_in, buf)) {
2087                l_ptr->deferred_inqueue_sz++;
2088                l_ptr->stats.deferred_recv++;
2089                if ((l_ptr->deferred_inqueue_sz % 16) == 1)
2090                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
2091        } else
2092                l_ptr->stats.duplicates++;
2093}
2094
2095/*
2096 * Send protocol message to the other endpoint.
2097 */
2098void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
2099                              u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
2100{
2101        struct sk_buff *buf = NULL;
2102        struct tipc_msg *msg = l_ptr->pmsg;
2103        u32 msg_size = sizeof(l_ptr->proto_msg);
2104
2105        if (link_blocked(l_ptr))
2106                return;
2107        msg_set_type(msg, msg_typ);
2108        msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
2109        msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
2110        msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
2111
2112        if (msg_typ == STATE_MSG) {
2113                u32 next_sent = mod(l_ptr->next_out_no);
2114
2115                if (!tipc_link_is_up(l_ptr))
2116                        return;
2117                if (l_ptr->next_out)
2118                        next_sent = msg_seqno(buf_msg(l_ptr->next_out));
2119                msg_set_next_sent(msg, next_sent);
2120                if (l_ptr->oldest_deferred_in) {
2121                        u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
2122                        gap = mod(rec - mod(l_ptr->next_in_no));
2123                }
2124                msg_set_seq_gap(msg, gap);
2125                if (gap)
2126                        l_ptr->stats.sent_nacks++;
2127                msg_set_link_tolerance(msg, tolerance);
2128                msg_set_linkprio(msg, priority);
2129                msg_set_max_pkt(msg, ack_mtu);
2130                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
2131                msg_set_probe(msg, probe_msg != 0);
2132                if (probe_msg) {
2133                        u32 mtu = l_ptr->max_pkt;
2134
2135                        if ((mtu < l_ptr->max_pkt_target) &&
2136                            link_working_working(l_ptr) &&
2137                            l_ptr->fsm_msg_cnt) {
2138                                msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2139                                if (l_ptr->max_pkt_probes == 10) {
2140                                        l_ptr->max_pkt_target = (msg_size - 4);
2141                                        l_ptr->max_pkt_probes = 0;
2142                                        msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2143                                }
2144                                l_ptr->max_pkt_probes++;
2145                        }
2146
2147                        l_ptr->stats.sent_probes++;
2148                }
2149                l_ptr->stats.sent_states++;
2150        } else {                /* RESET_MSG or ACTIVATE_MSG */
2151                msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2152                msg_set_seq_gap(msg, 0);
2153                msg_set_next_sent(msg, 1);
2154                msg_set_link_tolerance(msg, l_ptr->tolerance);
2155                msg_set_linkprio(msg, l_ptr->priority);
2156                msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2157        }
2158
2159        if (tipc_node_has_redundant_links(l_ptr->owner)) {
2160                msg_set_redundant_link(msg);
2161        } else {
2162                msg_clear_redundant_link(msg);
2163        }
2164        msg_set_linkprio(msg, l_ptr->priority);
2165
2166        /* Ensure sequence number will not fit : */
2167
2168        msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2169
2170        /* Congestion? */
2171
2172        if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2173                if (!l_ptr->proto_msg_queue) {
2174                        l_ptr->proto_msg_queue =
2175                                buf_acquire(sizeof(l_ptr->proto_msg));
2176                }
2177                buf = l_ptr->proto_msg_queue;
2178                if (!buf)
2179                        return;
2180                skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2181                return;
2182        }
2183        msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
2184
2185        /* Message can be sent */
2186
2187        msg_dbg(msg, ">>");
2188
2189        buf = buf_acquire(msg_size);
2190        if (!buf)
2191                return;
2192
2193        skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2194        msg_set_size(buf_msg(buf), msg_size);
2195
2196        if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2197                l_ptr->unacked_window = 0;
2198                buf_discard(buf);
2199                return;
2200        }
2201
2202        /* New congestion */
2203        tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2204        l_ptr->proto_msg_queue = buf;
2205        l_ptr->stats.bearer_congs++;
2206}
2207
2208/*
2209 * Receive protocol message :
2210 * Note that network plane id propagates through the network, and may
2211 * change at any time. The node with lowest address rules
2212 */
2213
2214static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2215{
2216        u32 rec_gap = 0;
2217        u32 max_pkt_info;
2218        u32 max_pkt_ack;
2219        u32 msg_tol;
2220        struct tipc_msg *msg = buf_msg(buf);
2221
2222        dbg("AT(%u):", jiffies_to_msecs(jiffies));
2223        msg_dbg(msg, "<<");
2224        if (link_blocked(l_ptr))
2225                goto exit;
2226
2227        /* record unnumbered packet arrival (force mismatch on next timeout) */
2228
2229        l_ptr->checkpoint--;
2230
2231        if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2232                if (tipc_own_addr > msg_prevnode(msg))
2233                        l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2234
2235        l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2236
2237        switch (msg_type(msg)) {
2238
2239        case RESET_MSG:
2240                if (!link_working_unknown(l_ptr) &&
2241                    (l_ptr->peer_session != INVALID_SESSION)) {
2242                        if (msg_session(msg) == l_ptr->peer_session) {
2243                                dbg("Duplicate RESET: %u<->%u\n",
2244                                    msg_session(msg), l_ptr->peer_session);
2245                                break; /* duplicate: ignore */
2246                        }
2247                }
2248                /* fall thru' */
2249        case ACTIVATE_MSG:
2250                /* Update link settings according other endpoint's values */
2251
2252                strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2253
2254                if ((msg_tol = msg_link_tolerance(msg)) &&
2255                    (msg_tol > l_ptr->tolerance))
2256                        link_set_supervision_props(l_ptr, msg_tol);
2257
2258                if (msg_linkprio(msg) > l_ptr->priority)
2259                        l_ptr->priority = msg_linkprio(msg);
2260
2261                max_pkt_info = msg_max_pkt(msg);
2262                if (max_pkt_info) {
2263                        if (max_pkt_info < l_ptr->max_pkt_target)
2264                                l_ptr->max_pkt_target = max_pkt_info;
2265                        if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2266                                l_ptr->max_pkt = l_ptr->max_pkt_target;
2267                } else {
2268                        l_ptr->max_pkt = l_ptr->max_pkt_target;
2269                }
2270                l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2271
2272                link_state_event(l_ptr, msg_type(msg));
2273
2274                l_ptr->peer_session = msg_session(msg);
2275                l_ptr->peer_bearer_id = msg_bearer_id(msg);
2276
2277                /* Synchronize broadcast sequence numbers */
2278                if (!tipc_node_has_redundant_links(l_ptr->owner)) {
2279                        l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2280                }
2281                break;
2282        case STATE_MSG:
2283
2284                if ((msg_tol = msg_link_tolerance(msg)))
2285                        link_set_supervision_props(l_ptr, msg_tol);
2286
2287                if (msg_linkprio(msg) &&
2288                    (msg_linkprio(msg) != l_ptr->priority)) {
2289                        warn("Resetting link <%s>, priority change %u->%u\n",
2290                             l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2291                        l_ptr->priority = msg_linkprio(msg);
2292                        tipc_link_reset(l_ptr); /* Enforce change to take effect */
2293                        break;
2294                }
2295                link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2296                l_ptr->stats.recv_states++;
2297                if (link_reset_unknown(l_ptr))
2298                        break;
2299
2300                if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2301                        rec_gap = mod(msg_next_sent(msg) -
2302                                      mod(l_ptr->next_in_no));
2303                }
2304
2305                max_pkt_ack = msg_max_pkt(msg);
2306                if (max_pkt_ack > l_ptr->max_pkt) {
2307                        dbg("Link <%s> updated MTU %u -> %u\n",
2308                            l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
2309                        l_ptr->max_pkt = max_pkt_ack;
2310                        l_ptr->max_pkt_probes = 0;
2311                }
2312
2313                max_pkt_ack = 0;
2314                if (msg_probe(msg)) {
2315                        l_ptr->stats.recv_probes++;
2316                        if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
2317                                max_pkt_ack = msg_size(msg);
2318                        }
2319                }
2320
2321                /* Protocol message before retransmits, reduce loss risk */
2322
2323                tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2324
2325                if (rec_gap || (msg_probe(msg))) {
2326                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2327                                                 0, rec_gap, 0, 0, max_pkt_ack);
2328                }
2329                if (msg_seq_gap(msg)) {
2330                        msg_dbg(msg, "With Gap:");
2331                        l_ptr->stats.recv_nacks++;
2332                        tipc_link_retransmit(l_ptr, l_ptr->first_out,
2333                                             msg_seq_gap(msg));
2334                }
2335                break;
2336        default:
2337                msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<");
2338        }
2339exit:
2340        buf_discard(buf);
2341}
2342
2343
2344/*
2345 * tipc_link_tunnel(): Send one message via a link belonging to
2346 * another bearer. Owner node is locked.
2347 */
2348void tipc_link_tunnel(struct link *l_ptr,
2349                      struct tipc_msg *tunnel_hdr,
2350                      struct tipc_msg  *msg,
2351                      u32 selector)
2352{
2353        struct link *tunnel;
2354        struct sk_buff *buf;
2355        u32 length = msg_size(msg);
2356
2357        tunnel = l_ptr->owner->active_links[selector & 1];
2358        if (!tipc_link_is_up(tunnel)) {
2359                warn("Link changeover error, "
2360                     "tunnel link no longer available\n");
2361                return;
2362        }
2363        msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2364        buf = buf_acquire(length + INT_H_SIZE);
2365        if (!buf) {
2366                warn("Link changeover error, "
2367                     "unable to send tunnel msg\n");
2368                return;
2369        }
2370        skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2371        skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2372        dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane);
2373        msg_dbg(buf_msg(buf), ">SEND>");
2374        tipc_link_send_buf(tunnel, buf);
2375}
2376
2377
2378
2379/*
2380 * changeover(): Send whole message queue via the remaining link
2381 *               Owner node is locked.
2382 */
2383
2384void tipc_link_changeover(struct link *l_ptr)
2385{
2386        u32 msgcount = l_ptr->out_queue_size;
2387        struct sk_buff *crs = l_ptr->first_out;
2388        struct link *tunnel = l_ptr->owner->active_links[0];
2389        struct tipc_msg tunnel_hdr;
2390        int split_bundles;
2391
2392        if (!tunnel)
2393                return;
2394
2395        if (!l_ptr->owner->permit_changeover) {
2396                warn("Link changeover error, "
2397                     "peer did not permit changeover\n");
2398                return;
2399        }
2400
2401        tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2402                 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2403        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2404        msg_set_msgcnt(&tunnel_hdr, msgcount);
2405        dbg("Link changeover requires %u tunnel messages\n", msgcount);
2406
2407        if (!l_ptr->first_out) {
2408                struct sk_buff *buf;
2409
2410                buf = buf_acquire(INT_H_SIZE);
2411                if (buf) {
2412                        skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2413                        msg_set_size(&tunnel_hdr, INT_H_SIZE);
2414                        dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2415                            tunnel->b_ptr->net_plane);
2416                        msg_dbg(&tunnel_hdr, "EMPTY>SEND>");
2417                        tipc_link_send_buf(tunnel, buf);
2418                } else {
2419                        warn("Link changeover error, "
2420                             "unable to send changeover msg\n");
2421                }
2422                return;
2423        }
2424
2425        split_bundles = (l_ptr->owner->active_links[0] !=
2426                         l_ptr->owner->active_links[1]);
2427
2428        while (crs) {
2429                struct tipc_msg *msg = buf_msg(crs);
2430
2431                if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2432                        struct tipc_msg *m = msg_get_wrapped(msg);
2433                        unchar* pos = (unchar*)m;
2434
2435                        msgcount = msg_msgcnt(msg);
2436                        while (msgcount--) {
2437                                msg_set_seqno(m,msg_seqno(msg));
2438                                tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2439                                                 msg_link_selector(m));
2440                                pos += align(msg_size(m));
2441                                m = (struct tipc_msg *)pos;
2442                        }
2443                } else {
2444                        tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2445                                         msg_link_selector(msg));
2446                }
2447                crs = crs->next;
2448        }
2449}
2450
2451void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2452{
2453        struct sk_buff *iter;
2454        struct tipc_msg tunnel_hdr;
2455
2456        tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2457                 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2458        msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2459        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2460        iter = l_ptr->first_out;
2461        while (iter) {
2462                struct sk_buff *outbuf;
2463                struct tipc_msg *msg = buf_msg(iter);
2464                u32 length = msg_size(msg);
2465
2466                if (msg_user(msg) == MSG_BUNDLER)
2467                        msg_set_type(msg, CLOSED_MSG);
2468                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
2469                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2470                msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2471                outbuf = buf_acquire(length + INT_H_SIZE);
2472                if (outbuf == NULL) {
2473                        warn("Link changeover error, "
2474                             "unable to send duplicate msg\n");
2475                        return;
2476                }
2477                skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2478                skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2479                                               length);
2480                dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2481                    tunnel->b_ptr->net_plane);
2482                msg_dbg(buf_msg(outbuf), ">SEND>");
2483                tipc_link_send_buf(tunnel, outbuf);
2484                if (!tipc_link_is_up(l_ptr))
2485                        return;
2486                iter = iter->next;
2487        }
2488}
2489
2490
2491
2492/**
2493 * buf_extract - extracts embedded TIPC message from another message
2494 * @skb: encapsulating message buffer
2495 * @from_pos: offset to extract from
2496 *
2497 * Returns a new message buffer containing an embedded message.  The
2498 * encapsulating message itself is left unchanged.
2499 */
2500
2501static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2502{
2503        struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2504        u32 size = msg_size(msg);
2505        struct sk_buff *eb;
2506
2507        eb = buf_acquire(size);
2508        if (eb)
2509                skb_copy_to_linear_data(eb, msg, size);
2510        return eb;
2511}
2512
2513/*
2514 *  link_recv_changeover_msg(): Receive tunneled packet sent
2515 *  via other link. Node is locked. Return extracted buffer.
2516 */
2517
2518static int link_recv_changeover_msg(struct link **l_ptr,
2519                                    struct sk_buff **buf)
2520{
2521        struct sk_buff *tunnel_buf = *buf;
2522        struct link *dest_link;
2523        struct tipc_msg *msg;
2524        struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2525        u32 msg_typ = msg_type(tunnel_msg);
2526        u32 msg_count = msg_msgcnt(tunnel_msg);
2527
2528        dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2529        if (!dest_link) {
2530                msg_dbg(tunnel_msg, "NOLINK/<REC<");
2531                goto exit;
2532        }
2533        if (dest_link == *l_ptr) {
2534                err("Unexpected changeover message on link <%s>\n",
2535                    (*l_ptr)->name);
2536                goto exit;
2537        }
2538        dbg("%c<-%c:", dest_link->b_ptr->net_plane,
2539            (*l_ptr)->b_ptr->net_plane);
2540        *l_ptr = dest_link;
2541        msg = msg_get_wrapped(tunnel_msg);
2542
2543        if (msg_typ == DUPLICATE_MSG) {
2544                if (less(msg_seqno(msg), mod(dest_link->next_in_no))) {
2545                        msg_dbg(tunnel_msg, "DROP/<REC<");
2546                        goto exit;
2547                }
2548                *buf = buf_extract(tunnel_buf,INT_H_SIZE);
2549                if (*buf == NULL) {
2550                        warn("Link changeover error, duplicate msg dropped\n");
2551                        goto exit;
2552                }
2553                msg_dbg(tunnel_msg, "TNL<REC<");
2554                buf_discard(tunnel_buf);
2555                return 1;
2556        }
2557
2558        /* First original message ?: */
2559
2560        if (tipc_link_is_up(dest_link)) {
2561                msg_dbg(tunnel_msg, "UP/FIRST/<REC<");
2562                info("Resetting link <%s>, changeover initiated by peer\n",
2563                     dest_link->name);
2564                tipc_link_reset(dest_link);
2565                dest_link->exp_msg_count = msg_count;
2566                dbg("Expecting %u tunnelled messages\n", msg_count);
2567                if (!msg_count)
2568                        goto exit;
2569        } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2570                msg_dbg(tunnel_msg, "BLK/FIRST/<REC<");
2571                dest_link->exp_msg_count = msg_count;
2572                dbg("Expecting %u tunnelled messages\n", msg_count);
2573                if (!msg_count)
2574                        goto exit;
2575        }
2576
2577        /* Receive original message */
2578
2579        if (dest_link->exp_msg_count == 0) {
2580                warn("Link switchover error, "
2581                     "got too many tunnelled messages\n");
2582                msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<");
2583                dbg_print_link(dest_link, "LINK:");
2584                goto exit;
2585        }
2586        dest_link->exp_msg_count--;
2587        if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2588                msg_dbg(tunnel_msg, "DROP/DUPL/<REC<");
2589                goto exit;
2590        } else {
2591                *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2592                if (*buf != NULL) {
2593                        msg_dbg(tunnel_msg, "TNL<REC<");
2594                        buf_discard(tunnel_buf);
2595                        return 1;
2596                } else {
2597                        warn("Link changeover error, original msg dropped\n");
2598                }
2599        }
2600exit:
2601        *buf = NULL;
2602        buf_discard(tunnel_buf);
2603        return 0;
2604}
2605
2606/*
2607 *  Bundler functionality:
2608 */
2609void tipc_link_recv_bundle(struct sk_buff *buf)
2610{
2611        u32 msgcount = msg_msgcnt(buf_msg(buf));
2612        u32 pos = INT_H_SIZE;
2613        struct sk_buff *obuf;
2614
2615        msg_dbg(buf_msg(buf), "<BNDL<: ");
2616        while (msgcount--) {
2617                obuf = buf_extract(buf, pos);
2618                if (obuf == NULL) {
2619                        warn("Link unable to unbundle message(s)\n");
2620                        break;
2621                }
2622                pos += align(msg_size(buf_msg(obuf)));
2623                msg_dbg(buf_msg(obuf), "     /");
2624                tipc_net_route_msg(obuf);
2625        }
2626        buf_discard(buf);
2627}
2628
2629/*
2630 *  Fragmentation/defragmentation:
2631 */
2632
2633
2634/*
2635 * tipc_link_send_long_buf: Entry for buffers needing fragmentation.
2636 * The buffer is complete, inclusive total message length.
2637 * Returns user data length.
2638 */
2639int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2640{
2641        struct tipc_msg *inmsg = buf_msg(buf);
2642        struct tipc_msg fragm_hdr;
2643        u32 insize = msg_size(inmsg);
2644        u32 dsz = msg_data_sz(inmsg);
2645        unchar *crs = buf->data;
2646        u32 rest = insize;
2647        u32 pack_sz = l_ptr->max_pkt;
2648        u32 fragm_sz = pack_sz - INT_H_SIZE;
2649        u32 fragm_no = 1;
2650        u32 destaddr;
2651
2652        if (msg_short(inmsg))
2653                destaddr = l_ptr->addr;
2654        else
2655                destaddr = msg_destnode(inmsg);
2656
2657        if (msg_routed(inmsg))
2658                msg_set_prevnode(inmsg, tipc_own_addr);
2659
2660        /* Prepare reusable fragment header: */
2661
2662        tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2663                 INT_H_SIZE, destaddr);
2664        msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2665        msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2666        msg_set_fragm_no(&fragm_hdr, fragm_no);
2667        l_ptr->stats.sent_fragmented++;
2668
2669        /* Chop up message: */
2670
2671        while (rest > 0) {
2672                struct sk_buff *fragm;
2673
2674                if (rest <= fragm_sz) {
2675                        fragm_sz = rest;
2676                        msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2677                }
2678                fragm = buf_acquire(fragm_sz + INT_H_SIZE);
2679                if (fragm == NULL) {
2680                        warn("Link unable to fragment message\n");
2681                        dsz = -ENOMEM;
2682                        goto exit;
2683                }
2684                msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2685                skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2686                skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2687                                               fragm_sz);
2688                /*  Send queued messages first, if any: */
2689
2690                l_ptr->stats.sent_fragments++;
2691                tipc_link_send_buf(l_ptr, fragm);
2692                if (!tipc_link_is_up(l_ptr))
2693                        return dsz;
2694                msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2695                rest -= fragm_sz;
2696                crs += fragm_sz;
2697                msg_set_type(&fragm_hdr, FRAGMENT);
2698        }
2699exit:
2700        buf_discard(buf);
2701        return dsz;
2702}
2703
2704/*
2705 * A pending message being re-assembled must store certain values
2706 * to handle subsequent fragments correctly. The following functions
2707 * help storing these values in unused, available fields in the
2708 * pending message. This makes dynamic memory allocation unecessary.
2709 */
2710
2711static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2712{
2713        msg_set_seqno(buf_msg(buf), seqno);
2714}
2715
2716static u32 get_fragm_size(struct sk_buff *buf)
2717{
2718        return msg_ack(buf_msg(buf));
2719}
2720
2721static void set_fragm_size(struct sk_buff *buf, u32 sz)
2722{
2723        msg_set_ack(buf_msg(buf), sz);
2724}
2725
2726static u32 get_expected_frags(struct sk_buff *buf)
2727{
2728        return msg_bcast_ack(buf_msg(buf));
2729}
2730
2731static void set_expected_frags(struct sk_buff *buf, u32 exp)
2732{
2733        msg_set_bcast_ack(buf_msg(buf), exp);
2734}
2735
2736static u32 get_timer_cnt(struct sk_buff *buf)
2737{
2738        return msg_reroute_cnt(buf_msg(buf));
2739}
2740
2741static void incr_timer_cnt(struct sk_buff *buf)
2742{
2743        msg_incr_reroute_cnt(buf_msg(buf));
2744}
2745
2746/*
2747 * tipc_link_recv_fragment(): Called with node lock on. Returns
2748 * the reassembled buffer if message is complete.
2749 */
2750int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2751                            struct tipc_msg **m)
2752{
2753        struct sk_buff *prev = NULL;
2754        struct sk_buff *fbuf = *fb;
2755        struct tipc_msg *fragm = buf_msg(fbuf);
2756        struct sk_buff *pbuf = *pending;
2757        u32 long_msg_seq_no = msg_long_msgno(fragm);
2758
2759        *fb = NULL;
2760        msg_dbg(fragm,"FRG<REC<");
2761
2762        /* Is there an incomplete message waiting for this fragment? */
2763
2764        while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) ||
2765                        (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2766                prev = pbuf;
2767                pbuf = pbuf->next;
2768        }
2769
2770        if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2771                struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2772                u32 msg_sz = msg_size(imsg);
2773                u32 fragm_sz = msg_data_sz(fragm);
2774                u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2775                u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2776                if (msg_type(imsg) == TIPC_MCAST_MSG)
2777                        max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2778                if (msg_size(imsg) > max) {
2779                        msg_dbg(fragm,"<REC<Oversized: ");
2780                        buf_discard(fbuf);
2781                        return 0;
2782                }
2783                pbuf = buf_acquire(msg_size(imsg));
2784                if (pbuf != NULL) {
2785                        pbuf->next = *pending;
2786                        *pending = pbuf;
2787                        skb_copy_to_linear_data(pbuf, imsg,
2788                                                msg_data_sz(fragm));
2789                        /*  Prepare buffer for subsequent fragments. */
2790
2791                        set_long_msg_seqno(pbuf, long_msg_seq_no);
2792                        set_fragm_size(pbuf,fragm_sz);
2793                        set_expected_frags(pbuf,exp_fragm_cnt - 1);
2794                } else {
2795                        warn("Link unable to reassemble fragmented message\n");
2796                }
2797                buf_discard(fbuf);
2798                return 0;
2799        } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2800                u32 dsz = msg_data_sz(fragm);
2801                u32 fsz = get_fragm_size(pbuf);
2802                u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2803                u32 exp_frags = get_expected_frags(pbuf) - 1;
2804                skb_copy_to_linear_data_offset(pbuf, crs,
2805                                               msg_data(fragm), dsz);
2806                buf_discard(fbuf);
2807
2808                /* Is message complete? */
2809
2810                if (exp_frags == 0) {
2811                        if (prev)
2812                                prev->next = pbuf->next;
2813                        else
2814                                *pending = pbuf->next;
2815                        msg_reset_reroute_cnt(buf_msg(pbuf));
2816                        *fb = pbuf;
2817                        *m = buf_msg(pbuf);
2818                        return 1;
2819                }
2820                set_expected_frags(pbuf,exp_frags);
2821                return 0;
2822        }
2823        dbg(" Discarding orphan fragment %x\n",fbuf);
2824        msg_dbg(fragm,"ORPHAN:");
2825        dbg("Pending long buffers:\n");
2826        dbg_print_buf_chain(*pending);
2827        buf_discard(fbuf);
2828        return 0;
2829}
2830
2831/**
2832 * link_check_defragm_bufs - flush stale incoming message fragments
2833 * @l_ptr: pointer to link
2834 */
2835
2836static void link_check_defragm_bufs(struct link *l_ptr)
2837{
2838        struct sk_buff *prev = NULL;
2839        struct sk_buff *next = NULL;
2840        struct sk_buff *buf = l_ptr->defragm_buf;
2841
2842        if (!buf)
2843                return;
2844        if (!link_working_working(l_ptr))
2845                return;
2846        while (buf) {
2847                u32 cnt = get_timer_cnt(buf);
2848
2849                next = buf->next;
2850                if (cnt < 4) {
2851                        incr_timer_cnt(buf);
2852                        prev = buf;
2853                } else {
2854                        dbg(" Discarding incomplete long buffer\n");
2855                        msg_dbg(buf_msg(buf), "LONG:");
2856                        dbg_print_link(l_ptr, "curr:");
2857                        dbg("Pending long buffers:\n");
2858                        dbg_print_buf_chain(l_ptr->defragm_buf);
2859                        if (prev)
2860                                prev->next = buf->next;
2861                        else
2862                                l_ptr->defragm_buf = buf->next;
2863                        buf_discard(buf);
2864                }
2865                buf = next;
2866        }
2867}
2868
2869
2870
2871static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2872{
2873        l_ptr->tolerance = tolerance;
2874        l_ptr->continuity_interval =
2875                ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2876        l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2877}
2878
2879
2880void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2881{
2882        /* Data messages from this node, inclusive FIRST_FRAGM */
2883        l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2884        l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2885        l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2886        l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2887        /* Transiting data messages,inclusive FIRST_FRAGM */
2888        l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2889        l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2890        l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2891        l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2892        l_ptr->queue_limit[CONN_MANAGER] = 1200;
2893        l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200;
2894        l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2895        l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2896        /* FRAGMENT and LAST_FRAGMENT packets */
2897        l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2898}
2899
2900/**
2901 * link_find_link - locate link by name
2902 * @name - ptr to link name string
2903 * @node - ptr to area to be filled with ptr to associated node
2904 *
2905 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2906 * this also prevents link deletion.
2907 *
2908 * Returns pointer to link (or 0 if invalid link name).
2909 */
2910
2911static struct link *link_find_link(const char *name, struct tipc_node **node)
2912{
2913        struct link_name link_name_parts;
2914        struct bearer *b_ptr;
2915        struct link *l_ptr;
2916
2917        if (!link_name_validate(name, &link_name_parts))
2918                return NULL;
2919
2920        b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2921        if (!b_ptr)
2922                return NULL;
2923
2924        *node = tipc_node_find(link_name_parts.addr_peer);
2925        if (!*node)
2926                return NULL;
2927
2928        l_ptr = (*node)->links[b_ptr->identity];
2929        if (!l_ptr || strcmp(l_ptr->name, name))
2930                return NULL;
2931
2932        return l_ptr;
2933}
2934
2935struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2936                                     u16 cmd)
2937{
2938        struct tipc_link_config *args;
2939        u32 new_value;
2940        struct link *l_ptr;
2941        struct tipc_node *node;
2942        int res;
2943
2944        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2945                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2946
2947        args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2948        new_value = ntohl(args->value);
2949
2950        if (!strcmp(args->name, tipc_bclink_name)) {
2951                if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2952                    (tipc_bclink_set_queue_limits(new_value) == 0))
2953                        return tipc_cfg_reply_none();
2954                return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2955                                                   " (cannot change setting on broadcast link)");
2956        }
2957
2958        read_lock_bh(&tipc_net_lock);
2959        l_ptr = link_find_link(args->name, &node);
2960        if (!l_ptr) {
2961                read_unlock_bh(&tipc_net_lock);
2962                return tipc_cfg_reply_error_string("link not found");
2963        }
2964
2965        tipc_node_lock(node);
2966        res = -EINVAL;
2967        switch (cmd) {
2968        case TIPC_CMD_SET_LINK_TOL:
2969                if ((new_value >= TIPC_MIN_LINK_TOL) &&
2970                    (new_value <= TIPC_MAX_LINK_TOL)) {
2971                        link_set_supervision_props(l_ptr, new_value);
2972                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2973                                                 0, 0, new_value, 0, 0);
2974                        res = 0;
2975                }
2976                break;
2977        case TIPC_CMD_SET_LINK_PRI:
2978                if ((new_value >= TIPC_MIN_LINK_PRI) &&
2979                    (new_value <= TIPC_MAX_LINK_PRI)) {
2980                        l_ptr->priority = new_value;
2981                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2982                                                 0, 0, 0, new_value, 0);
2983                        res = 0;
2984                }
2985                break;
2986        case TIPC_CMD_SET_LINK_WINDOW:
2987                if ((new_value >= TIPC_MIN_LINK_WIN) &&
2988                    (new_value <= TIPC_MAX_LINK_WIN)) {
2989                        tipc_link_set_queue_limits(l_ptr, new_value);
2990                        res = 0;
2991                }
2992                break;
2993        }
2994        tipc_node_unlock(node);
2995
2996        read_unlock_bh(&tipc_net_lock);
2997        if (res)
2998                return tipc_cfg_reply_error_string("cannot change link setting");
2999
3000        return tipc_cfg_reply_none();
3001}
3002
3003/**
3004 * link_reset_statistics - reset link statistics
3005 * @l_ptr: pointer to link
3006 */
3007
3008static void link_reset_statistics(struct link *l_ptr)
3009{
3010        memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
3011        l_ptr->stats.sent_info = l_ptr->next_out_no;
3012        l_ptr->stats.recv_info = l_ptr->next_in_no;
3013}
3014
3015struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
3016{
3017        char *link_name;
3018        struct link *l_ptr;
3019        struct tipc_node *node;
3020
3021        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3022                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3023
3024        link_name = (char *)TLV_DATA(req_tlv_area);
3025        if (!strcmp(link_name, tipc_bclink_name)) {
3026                if (tipc_bclink_reset_stats())
3027                        return tipc_cfg_reply_error_string("link not found");
3028                return tipc_cfg_reply_none();
3029        }
3030
3031        read_lock_bh(&tipc_net_lock);
3032        l_ptr = link_find_link(link_name, &node);
3033        if (!l_ptr) {
3034                read_unlock_bh(&tipc_net_lock);
3035                return tipc_cfg_reply_error_string("link not found");
3036        }
3037
3038        tipc_node_lock(node);
3039        link_reset_statistics(l_ptr);
3040        tipc_node_unlock(node);
3041        read_unlock_bh(&tipc_net_lock);
3042        return tipc_cfg_reply_none();
3043}
3044
3045/**
3046 * percent - convert count to a percentage of total (rounding up or down)
3047 */
3048
3049static u32 percent(u32 count, u32 total)
3050{
3051        return (count * 100 + (total / 2)) / total;
3052}
3053
3054/**
3055 * tipc_link_stats - print link statistics
3056 * @name: link name
3057 * @buf: print buffer area
3058 * @buf_size: size of print buffer area
3059 *
3060 * Returns length of print buffer data string (or 0 if error)
3061 */
3062
3063static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
3064{
3065        struct print_buf pb;
3066        struct link *l_ptr;
3067        struct tipc_node *node;
3068        char *status;
3069        u32 profile_total = 0;
3070
3071        if (!strcmp(name, tipc_bclink_name))
3072                return tipc_bclink_stats(buf, buf_size);
3073
3074        tipc_printbuf_init(&pb, buf, buf_size);
3075
3076        read_lock_bh(&tipc_net_lock);
3077        l_ptr = link_find_link(name, &node);
3078        if (!l_ptr) {
3079                read_unlock_bh(&tipc_net_lock);
3080                return 0;
3081        }
3082        tipc_node_lock(node);
3083
3084        if (tipc_link_is_active(l_ptr))
3085                status = "ACTIVE";
3086        else if (tipc_link_is_up(l_ptr))
3087                status = "STANDBY";
3088        else
3089                status = "DEFUNCT";
3090        tipc_printf(&pb, "Link <%s>\n"
3091                         "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
3092                         "  Window:%u packets\n",
3093                    l_ptr->name, status, l_ptr->max_pkt,
3094                    l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
3095        tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
3096                    l_ptr->next_in_no - l_ptr->stats.recv_info,
3097                    l_ptr->stats.recv_fragments,
3098                    l_ptr->stats.recv_fragmented,
3099                    l_ptr->stats.recv_bundles,
3100                    l_ptr->stats.recv_bundled);
3101        tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
3102                    l_ptr->next_out_no - l_ptr->stats.sent_info,
3103                    l_ptr->stats.sent_fragments,
3104                    l_ptr->stats.sent_fragmented,
3105                    l_ptr->stats.sent_bundles,
3106                    l_ptr->stats.sent_bundled);
3107        profile_total = l_ptr->stats.msg_length_counts;
3108        if (!profile_total)
3109                profile_total = 1;
3110        tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
3111                         "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
3112                         "-16354:%u%% -32768:%u%% -66000:%u%%\n",
3113                    l_ptr->stats.msg_length_counts,
3114                    l_ptr->stats.msg_lengths_total / profile_total,
3115                    percent(l_ptr->stats.msg_length_profile[0], profile_total),
3116                    percent(l_ptr->stats.msg_length_profile[1], profile_total),
3117                    percent(l_ptr->stats.msg_length_profile[2], profile_total),
3118                    percent(l_ptr->stats.msg_length_profile[3], profile_total),
3119                    percent(l_ptr->stats.msg_length_profile[4], profile_total),
3120                    percent(l_ptr->stats.msg_length_profile[5], profile_total),
3121                    percent(l_ptr->stats.msg_length_profile[6], profile_total));
3122        tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
3123                    l_ptr->stats.recv_states,
3124                    l_ptr->stats.recv_probes,
3125                    l_ptr->stats.recv_nacks,
3126                    l_ptr->stats.deferred_recv,
3127                    l_ptr->stats.duplicates);
3128        tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
3129                    l_ptr->stats.sent_states,
3130                    l_ptr->stats.sent_probes,
3131                    l_ptr->stats.sent_nacks,
3132                    l_ptr->stats.sent_acks,
3133                    l_ptr->stats.retransmitted);
3134        tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
3135                    l_ptr->stats.bearer_congs,
3136                    l_ptr->stats.link_congs,
3137                    l_ptr->stats.max_queue_sz,
3138                    l_ptr->stats.queue_sz_counts
3139                    ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
3140                    : 0);
3141
3142        tipc_node_unlock(node);
3143        read_unlock_bh(&tipc_net_lock);
3144        return tipc_printbuf_validate(&pb);
3145}
3146
3147#define MAX_LINK_STATS_INFO 2000
3148
3149struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
3150{
3151        struct sk_buff *buf;
3152        struct tlv_desc *rep_tlv;
3153        int str_len;
3154
3155        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3156                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3157
3158        buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
3159        if (!buf)
3160                return NULL;
3161
3162        rep_tlv = (struct tlv_desc *)buf->data;
3163
3164        str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
3165                                  (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
3166        if (!str_len) {
3167                buf_discard(buf);
3168                return tipc_cfg_reply_error_string("link not found");
3169        }
3170
3171        skb_put(buf, TLV_SPACE(str_len));
3172        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3173
3174        return buf;
3175}
3176
3177#if 0
3178int link_control(const char *name, u32 op, u32 val)
3179{
3180        int res = -EINVAL;
3181        struct link *l_ptr;
3182        u32 bearer_id;
3183        struct tipc_node * node;
3184        u32 a;
3185
3186        a = link_name2addr(name, &bearer_id);
3187        read_lock_bh(&tipc_net_lock);
3188        node = tipc_node_find(a);
3189        if (node) {
3190                tipc_node_lock(node);
3191                l_ptr = node->links[bearer_id];
3192                if (l_ptr) {
3193                        if (op == TIPC_REMOVE_LINK) {
3194                                struct bearer *b_ptr = l_ptr->b_ptr;
3195                                spin_lock_bh(&b_ptr->publ.lock);
3196                                tipc_link_delete(l_ptr);
3197                                spin_unlock_bh(&b_ptr->publ.lock);
3198                        }
3199                        if (op == TIPC_CMD_BLOCK_LINK) {
3200                                tipc_link_reset(l_ptr);
3201                                l_ptr->blocked = 1;
3202                        }
3203                        if (op == TIPC_CMD_UNBLOCK_LINK) {
3204                                l_ptr->blocked = 0;
3205                        }
3206                        res = 0;
3207                }
3208                tipc_node_unlock(node);
3209        }
3210        read_unlock_bh(&tipc_net_lock);
3211        return res;
3212}
3213#endif
3214
3215/**
3216 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
3217 * @dest: network address of destination node
3218 * @selector: used to select from set of active links
3219 *
3220 * If no active link can be found, uses default maximum packet size.
3221 */
3222
3223u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
3224{
3225        struct tipc_node *n_ptr;
3226        struct link *l_ptr;
3227        u32 res = MAX_PKT_DEFAULT;
3228
3229        if (dest == tipc_own_addr)
3230                return MAX_MSG_SIZE;
3231
3232        read_lock_bh(&tipc_net_lock);
3233        n_ptr = tipc_node_select(dest, selector);
3234        if (n_ptr) {
3235                tipc_node_lock(n_ptr);
3236                l_ptr = n_ptr->active_links[selector & 1];
3237                if (l_ptr)
3238                        res = l_ptr->max_pkt;
3239                tipc_node_unlock(n_ptr);
3240        }
3241        read_unlock_bh(&tipc_net_lock);
3242        return res;
3243}
3244
3245#if 0
3246static void link_dump_rec_queue(struct link *l_ptr)
3247{
3248        struct sk_buff *crs;
3249
3250        if (!l_ptr->oldest_deferred_in) {
3251                info("Reception queue empty\n");
3252                return;
3253        }
3254        info("Contents of Reception queue:\n");
3255        crs = l_ptr->oldest_deferred_in;
3256        while (crs) {
3257                if (crs->data == (void *)0x0000a3a3) {
3258                        info("buffer %x invalid\n", crs);
3259                        return;
3260                }
3261                msg_dbg(buf_msg(crs), "In rec queue:\n");
3262                crs = crs->next;
3263        }
3264}
3265#endif
3266
3267static void link_dump_send_queue(struct link *l_ptr)
3268{
3269        if (l_ptr->next_out) {
3270                info("\nContents of unsent queue:\n");
3271                dbg_print_buf_chain(l_ptr->next_out);
3272        }
3273        info("\nContents of send queue:\n");
3274        if (l_ptr->first_out) {
3275                dbg_print_buf_chain(l_ptr->first_out);
3276        }
3277        info("Empty send queue\n");
3278}
3279
3280static void link_print(struct link *l_ptr, struct print_buf *buf,
3281                       const char *str)
3282{
3283        tipc_printf(buf, str);
3284        if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3285                return;
3286        tipc_printf(buf, "Link %x<%s>:",
3287                    l_ptr->addr, l_ptr->b_ptr->publ.name);
3288        tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3289        tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3290        tipc_printf(buf, "SQUE");
3291        if (l_ptr->first_out) {
3292                tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
3293                if (l_ptr->next_out)
3294                        tipc_printf(buf, "%u..",
3295                                    msg_seqno(buf_msg(l_ptr->next_out)));
3296                tipc_printf(buf, "%u]", msg_seqno(buf_msg(l_ptr->last_out)));
3297                if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
3298                         msg_seqno(buf_msg(l_ptr->first_out)))
3299                     != (l_ptr->out_queue_size - 1)) ||
3300                    (l_ptr->last_out->next != NULL)) {
3301                        tipc_printf(buf, "\nSend queue inconsistency\n");
3302                        tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
3303                        tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
3304                        tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
3305                        link_dump_send_queue(l_ptr);
3306                }
3307        } else
3308                tipc_printf(buf, "[]");
3309        tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3310        if (l_ptr->oldest_deferred_in) {
3311                u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
3312                u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
3313                tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3314                if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3315                        tipc_printf(buf, ":RQSIZ(%u)",
3316                                    l_ptr->deferred_inqueue_sz);
3317                }
3318        }
3319        if (link_working_unknown(l_ptr))
3320                tipc_printf(buf, ":WU");
3321        if (link_reset_reset(l_ptr))
3322                tipc_printf(buf, ":RR");
3323        if (link_reset_unknown(l_ptr))
3324                tipc_printf(buf, ":RU");
3325        if (link_working_working(l_ptr))
3326                tipc_printf(buf, ":WW");
3327        tipc_printf(buf, "\n");
3328}
3329
3330