linux/net/tipc/link.c
<<
>>
Prefs
   1/*
   2 * net/tipc/link.c: TIPC link code
   3 *
   4 * Copyright (c) 1996-2007, Ericsson AB
   5 * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "link.h"
  39#include "port.h"
  40#include "name_distr.h"
  41#include "discover.h"
  42#include "config.h"
  43
  44
  45/*
  46 * Out-of-range value for link session numbers
  47 */
  48
  49#define INVALID_SESSION 0x10000
  50
  51/*
  52 * Link state events:
  53 */
  54
  55#define  STARTING_EVT    856384768      /* link processing trigger */
  56#define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
  57#define  TIMEOUT_EVT     560817u        /* link timer expired */
  58
  59/*
  60 * The following two 'message types' is really just implementation
  61 * data conveniently stored in the message header.
  62 * They must not be considered part of the protocol
  63 */
  64#define OPEN_MSG   0
  65#define CLOSED_MSG 1
  66
  67/*
  68 * State value stored in 'exp_msg_count'
  69 */
  70
  71#define START_CHANGEOVER 100000u
  72
  73/**
  74 * struct tipc_link_name - deconstructed link name
  75 * @addr_local: network address of node at this end
  76 * @if_local: name of interface at this end
  77 * @addr_peer: network address of node at far end
  78 * @if_peer: name of interface at far end
  79 */
  80
  81struct tipc_link_name {
  82        u32 addr_local;
  83        char if_local[TIPC_MAX_IF_NAME];
  84        u32 addr_peer;
  85        char if_peer[TIPC_MAX_IF_NAME];
  86};
  87
  88static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
  89                                       struct sk_buff *buf);
  90static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf);
  91static int  link_recv_changeover_msg(struct tipc_link **l_ptr,
  92                                     struct sk_buff **buf);
  93static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
  94static int  link_send_sections_long(struct tipc_port *sender,
  95                                    struct iovec const *msg_sect,
  96                                    u32 num_sect, unsigned int total_len,
  97                                    u32 destnode);
  98static void link_check_defragm_bufs(struct tipc_link *l_ptr);
  99static void link_state_event(struct tipc_link *l_ptr, u32 event);
 100static void link_reset_statistics(struct tipc_link *l_ptr);
 101static void link_print(struct tipc_link *l_ptr, const char *str);
 102static void link_start(struct tipc_link *l_ptr);
 103static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
 104
 105/*
 106 *  Simple link routines
 107 */
 108
 109static unsigned int align(unsigned int i)
 110{
 111        return (i + 3) & ~3u;
 112}
 113
 114static void link_init_max_pkt(struct tipc_link *l_ptr)
 115{
 116        u32 max_pkt;
 117
 118        max_pkt = (l_ptr->b_ptr->mtu & ~3);
 119        if (max_pkt > MAX_MSG_SIZE)
 120                max_pkt = MAX_MSG_SIZE;
 121
 122        l_ptr->max_pkt_target = max_pkt;
 123        if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
 124                l_ptr->max_pkt = l_ptr->max_pkt_target;
 125        else
 126                l_ptr->max_pkt = MAX_PKT_DEFAULT;
 127
 128        l_ptr->max_pkt_probes = 0;
 129}
 130
 131static u32 link_next_sent(struct tipc_link *l_ptr)
 132{
 133        if (l_ptr->next_out)
 134                return buf_seqno(l_ptr->next_out);
 135        return mod(l_ptr->next_out_no);
 136}
 137
 138static u32 link_last_sent(struct tipc_link *l_ptr)
 139{
 140        return mod(link_next_sent(l_ptr) - 1);
 141}
 142
 143/*
 144 *  Simple non-static link routines (i.e. referenced outside this file)
 145 */
 146
 147int tipc_link_is_up(struct tipc_link *l_ptr)
 148{
 149        if (!l_ptr)
 150                return 0;
 151        return link_working_working(l_ptr) || link_working_unknown(l_ptr);
 152}
 153
 154int tipc_link_is_active(struct tipc_link *l_ptr)
 155{
 156        return  (l_ptr->owner->active_links[0] == l_ptr) ||
 157                (l_ptr->owner->active_links[1] == l_ptr);
 158}
 159
 160/**
 161 * link_name_validate - validate & (optionally) deconstruct tipc_link name
 162 * @name - ptr to link name string
 163 * @name_parts - ptr to area for link name components (or NULL if not needed)
 164 *
 165 * Returns 1 if link name is valid, otherwise 0.
 166 */
 167
 168static int link_name_validate(const char *name,
 169                                struct tipc_link_name *name_parts)
 170{
 171        char name_copy[TIPC_MAX_LINK_NAME];
 172        char *addr_local;
 173        char *if_local;
 174        char *addr_peer;
 175        char *if_peer;
 176        char dummy;
 177        u32 z_local, c_local, n_local;
 178        u32 z_peer, c_peer, n_peer;
 179        u32 if_local_len;
 180        u32 if_peer_len;
 181
 182        /* copy link name & ensure length is OK */
 183
 184        name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
 185        /* need above in case non-Posix strncpy() doesn't pad with nulls */
 186        strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
 187        if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
 188                return 0;
 189
 190        /* ensure all component parts of link name are present */
 191
 192        addr_local = name_copy;
 193        if_local = strchr(addr_local, ':');
 194        if (if_local == NULL)
 195                return 0;
 196        *(if_local++) = 0;
 197        addr_peer = strchr(if_local, '-');
 198        if (addr_peer == NULL)
 199                return 0;
 200        *(addr_peer++) = 0;
 201        if_local_len = addr_peer - if_local;
 202        if_peer = strchr(addr_peer, ':');
 203        if (if_peer == NULL)
 204                return 0;
 205        *(if_peer++) = 0;
 206        if_peer_len = strlen(if_peer) + 1;
 207
 208        /* validate component parts of link name */
 209
 210        if ((sscanf(addr_local, "%u.%u.%u%c",
 211                    &z_local, &c_local, &n_local, &dummy) != 3) ||
 212            (sscanf(addr_peer, "%u.%u.%u%c",
 213                    &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
 214            (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
 215            (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
 216            (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
 217            (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
 218            (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
 219            (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
 220                return 0;
 221
 222        /* return link name components, if necessary */
 223
 224        if (name_parts) {
 225                name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
 226                strcpy(name_parts->if_local, if_local);
 227                name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
 228                strcpy(name_parts->if_peer, if_peer);
 229        }
 230        return 1;
 231}
 232
 233/**
 234 * link_timeout - handle expiration of link timer
 235 * @l_ptr: pointer to link
 236 *
 237 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
 238 * with tipc_link_delete().  (There is no risk that the node will be deleted by
 239 * another thread because tipc_link_delete() always cancels the link timer before
 240 * tipc_node_delete() is called.)
 241 */
 242
 243static void link_timeout(struct tipc_link *l_ptr)
 244{
 245        tipc_node_lock(l_ptr->owner);
 246
 247        /* update counters used in statistical profiling of send traffic */
 248
 249        l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
 250        l_ptr->stats.queue_sz_counts++;
 251
 252        if (l_ptr->first_out) {
 253                struct tipc_msg *msg = buf_msg(l_ptr->first_out);
 254                u32 length = msg_size(msg);
 255
 256                if ((msg_user(msg) == MSG_FRAGMENTER) &&
 257                    (msg_type(msg) == FIRST_FRAGMENT)) {
 258                        length = msg_size(msg_get_wrapped(msg));
 259                }
 260                if (length) {
 261                        l_ptr->stats.msg_lengths_total += length;
 262                        l_ptr->stats.msg_length_counts++;
 263                        if (length <= 64)
 264                                l_ptr->stats.msg_length_profile[0]++;
 265                        else if (length <= 256)
 266                                l_ptr->stats.msg_length_profile[1]++;
 267                        else if (length <= 1024)
 268                                l_ptr->stats.msg_length_profile[2]++;
 269                        else if (length <= 4096)
 270                                l_ptr->stats.msg_length_profile[3]++;
 271                        else if (length <= 16384)
 272                                l_ptr->stats.msg_length_profile[4]++;
 273                        else if (length <= 32768)
 274                                l_ptr->stats.msg_length_profile[5]++;
 275                        else
 276                                l_ptr->stats.msg_length_profile[6]++;
 277                }
 278        }
 279
 280        /* do all other link processing performed on a periodic basis */
 281
 282        link_check_defragm_bufs(l_ptr);
 283
 284        link_state_event(l_ptr, TIMEOUT_EVT);
 285
 286        if (l_ptr->next_out)
 287                tipc_link_push_queue(l_ptr);
 288
 289        tipc_node_unlock(l_ptr->owner);
 290}
 291
 292static void link_set_timer(struct tipc_link *l_ptr, u32 time)
 293{
 294        k_start_timer(&l_ptr->timer, time);
 295}
 296
 297/**
 298 * tipc_link_create - create a new link
 299 * @n_ptr: pointer to associated node
 300 * @b_ptr: pointer to associated bearer
 301 * @media_addr: media address to use when sending messages over link
 302 *
 303 * Returns pointer to link.
 304 */
 305
 306struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 307                              struct tipc_bearer *b_ptr,
 308                              const struct tipc_media_addr *media_addr)
 309{
 310        struct tipc_link *l_ptr;
 311        struct tipc_msg *msg;
 312        char *if_name;
 313        char addr_string[16];
 314        u32 peer = n_ptr->addr;
 315
 316        if (n_ptr->link_cnt >= 2) {
 317                tipc_addr_string_fill(addr_string, n_ptr->addr);
 318                err("Attempt to establish third link to %s\n", addr_string);
 319                return NULL;
 320        }
 321
 322        if (n_ptr->links[b_ptr->identity]) {
 323                tipc_addr_string_fill(addr_string, n_ptr->addr);
 324                err("Attempt to establish second link on <%s> to %s\n",
 325                    b_ptr->name, addr_string);
 326                return NULL;
 327        }
 328
 329        l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
 330        if (!l_ptr) {
 331                warn("Link creation failed, no memory\n");
 332                return NULL;
 333        }
 334
 335        l_ptr->addr = peer;
 336        if_name = strchr(b_ptr->name, ':') + 1;
 337        sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
 338                tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
 339                tipc_node(tipc_own_addr),
 340                if_name,
 341                tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
 342                /* note: peer i/f name is updated by reset/activate message */
 343        memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
 344        l_ptr->owner = n_ptr;
 345        l_ptr->checkpoint = 1;
 346        l_ptr->peer_session = INVALID_SESSION;
 347        l_ptr->b_ptr = b_ptr;
 348        link_set_supervision_props(l_ptr, b_ptr->tolerance);
 349        l_ptr->state = RESET_UNKNOWN;
 350
 351        l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
 352        msg = l_ptr->pmsg;
 353        tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
 354        msg_set_size(msg, sizeof(l_ptr->proto_msg));
 355        msg_set_session(msg, (tipc_random & 0xffff));
 356        msg_set_bearer_id(msg, b_ptr->identity);
 357        strcpy((char *)msg_data(msg), if_name);
 358
 359        l_ptr->priority = b_ptr->priority;
 360        tipc_link_set_queue_limits(l_ptr, b_ptr->window);
 361
 362        link_init_max_pkt(l_ptr);
 363
 364        l_ptr->next_out_no = 1;
 365        INIT_LIST_HEAD(&l_ptr->waiting_ports);
 366
 367        link_reset_statistics(l_ptr);
 368
 369        tipc_node_attach_link(n_ptr, l_ptr);
 370
 371        k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
 372        list_add_tail(&l_ptr->link_list, &b_ptr->links);
 373        tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
 374
 375        return l_ptr;
 376}
 377
 378/**
 379 * tipc_link_delete - delete a link
 380 * @l_ptr: pointer to link
 381 *
 382 * Note: 'tipc_net_lock' is write_locked, bearer is locked.
 383 * This routine must not grab the node lock until after link timer cancellation
 384 * to avoid a potential deadlock situation.
 385 */
 386
 387void tipc_link_delete(struct tipc_link *l_ptr)
 388{
 389        if (!l_ptr) {
 390                err("Attempt to delete non-existent link\n");
 391                return;
 392        }
 393
 394        k_cancel_timer(&l_ptr->timer);
 395
 396        tipc_node_lock(l_ptr->owner);
 397        tipc_link_reset(l_ptr);
 398        tipc_node_detach_link(l_ptr->owner, l_ptr);
 399        tipc_link_stop(l_ptr);
 400        list_del_init(&l_ptr->link_list);
 401        tipc_node_unlock(l_ptr->owner);
 402        k_term_timer(&l_ptr->timer);
 403        kfree(l_ptr);
 404}
 405
 406static void link_start(struct tipc_link *l_ptr)
 407{
 408        tipc_node_lock(l_ptr->owner);
 409        link_state_event(l_ptr, STARTING_EVT);
 410        tipc_node_unlock(l_ptr->owner);
 411}
 412
 413/**
 414 * link_schedule_port - schedule port for deferred sending
 415 * @l_ptr: pointer to link
 416 * @origport: reference to sending port
 417 * @sz: amount of data to be sent
 418 *
 419 * Schedules port for renewed sending of messages after link congestion
 420 * has abated.
 421 */
 422
 423static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
 424{
 425        struct tipc_port *p_ptr;
 426
 427        spin_lock_bh(&tipc_port_list_lock);
 428        p_ptr = tipc_port_lock(origport);
 429        if (p_ptr) {
 430                if (!p_ptr->wakeup)
 431                        goto exit;
 432                if (!list_empty(&p_ptr->wait_list))
 433                        goto exit;
 434                p_ptr->congested = 1;
 435                p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
 436                list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
 437                l_ptr->stats.link_congs++;
 438exit:
 439                tipc_port_unlock(p_ptr);
 440        }
 441        spin_unlock_bh(&tipc_port_list_lock);
 442        return -ELINKCONG;
 443}
 444
 445void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
 446{
 447        struct tipc_port *p_ptr;
 448        struct tipc_port *temp_p_ptr;
 449        int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
 450
 451        if (all)
 452                win = 100000;
 453        if (win <= 0)
 454                return;
 455        if (!spin_trylock_bh(&tipc_port_list_lock))
 456                return;
 457        if (link_congested(l_ptr))
 458                goto exit;
 459        list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
 460                                 wait_list) {
 461                if (win <= 0)
 462                        break;
 463                list_del_init(&p_ptr->wait_list);
 464                spin_lock_bh(p_ptr->lock);
 465                p_ptr->congested = 0;
 466                p_ptr->wakeup(p_ptr);
 467                win -= p_ptr->waiting_pkts;
 468                spin_unlock_bh(p_ptr->lock);
 469        }
 470
 471exit:
 472        spin_unlock_bh(&tipc_port_list_lock);
 473}
 474
 475/**
 476 * link_release_outqueue - purge link's outbound message queue
 477 * @l_ptr: pointer to link
 478 */
 479
 480static void link_release_outqueue(struct tipc_link *l_ptr)
 481{
 482        struct sk_buff *buf = l_ptr->first_out;
 483        struct sk_buff *next;
 484
 485        while (buf) {
 486                next = buf->next;
 487                buf_discard(buf);
 488                buf = next;
 489        }
 490        l_ptr->first_out = NULL;
 491        l_ptr->out_queue_size = 0;
 492}
 493
 494/**
 495 * tipc_link_reset_fragments - purge link's inbound message fragments queue
 496 * @l_ptr: pointer to link
 497 */
 498
 499void tipc_link_reset_fragments(struct tipc_link *l_ptr)
 500{
 501        struct sk_buff *buf = l_ptr->defragm_buf;
 502        struct sk_buff *next;
 503
 504        while (buf) {
 505                next = buf->next;
 506                buf_discard(buf);
 507                buf = next;
 508        }
 509        l_ptr->defragm_buf = NULL;
 510}
 511
 512/**
 513 * tipc_link_stop - purge all inbound and outbound messages associated with link
 514 * @l_ptr: pointer to link
 515 */
 516
 517void tipc_link_stop(struct tipc_link *l_ptr)
 518{
 519        struct sk_buff *buf;
 520        struct sk_buff *next;
 521
 522        buf = l_ptr->oldest_deferred_in;
 523        while (buf) {
 524                next = buf->next;
 525                buf_discard(buf);
 526                buf = next;
 527        }
 528
 529        buf = l_ptr->first_out;
 530        while (buf) {
 531                next = buf->next;
 532                buf_discard(buf);
 533                buf = next;
 534        }
 535
 536        tipc_link_reset_fragments(l_ptr);
 537
 538        buf_discard(l_ptr->proto_msg_queue);
 539        l_ptr->proto_msg_queue = NULL;
 540}
 541
 542void tipc_link_reset(struct tipc_link *l_ptr)
 543{
 544        struct sk_buff *buf;
 545        u32 prev_state = l_ptr->state;
 546        u32 checkpoint = l_ptr->next_in_no;
 547        int was_active_link = tipc_link_is_active(l_ptr);
 548
 549        msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
 550
 551        /* Link is down, accept any session */
 552        l_ptr->peer_session = INVALID_SESSION;
 553
 554        /* Prepare for max packet size negotiation */
 555        link_init_max_pkt(l_ptr);
 556
 557        l_ptr->state = RESET_UNKNOWN;
 558
 559        if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
 560                return;
 561
 562        tipc_node_link_down(l_ptr->owner, l_ptr);
 563        tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
 564
 565        if (was_active_link && tipc_node_active_links(l_ptr->owner) &&
 566            l_ptr->owner->permit_changeover) {
 567                l_ptr->reset_checkpoint = checkpoint;
 568                l_ptr->exp_msg_count = START_CHANGEOVER;
 569        }
 570
 571        /* Clean up all queues: */
 572
 573        link_release_outqueue(l_ptr);
 574        buf_discard(l_ptr->proto_msg_queue);
 575        l_ptr->proto_msg_queue = NULL;
 576        buf = l_ptr->oldest_deferred_in;
 577        while (buf) {
 578                struct sk_buff *next = buf->next;
 579                buf_discard(buf);
 580                buf = next;
 581        }
 582        if (!list_empty(&l_ptr->waiting_ports))
 583                tipc_link_wakeup_ports(l_ptr, 1);
 584
 585        l_ptr->retransm_queue_head = 0;
 586        l_ptr->retransm_queue_size = 0;
 587        l_ptr->last_out = NULL;
 588        l_ptr->first_out = NULL;
 589        l_ptr->next_out = NULL;
 590        l_ptr->unacked_window = 0;
 591        l_ptr->checkpoint = 1;
 592        l_ptr->next_out_no = 1;
 593        l_ptr->deferred_inqueue_sz = 0;
 594        l_ptr->oldest_deferred_in = NULL;
 595        l_ptr->newest_deferred_in = NULL;
 596        l_ptr->fsm_msg_cnt = 0;
 597        l_ptr->stale_count = 0;
 598        link_reset_statistics(l_ptr);
 599}
 600
 601
 602static void link_activate(struct tipc_link *l_ptr)
 603{
 604        l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
 605        tipc_node_link_up(l_ptr->owner, l_ptr);
 606        tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
 607}
 608
 609/**
 610 * link_state_event - link finite state machine
 611 * @l_ptr: pointer to link
 612 * @event: state machine event to process
 613 */
 614
 615static void link_state_event(struct tipc_link *l_ptr, unsigned event)
 616{
 617        struct tipc_link *other;
 618        u32 cont_intv = l_ptr->continuity_interval;
 619
 620        if (!l_ptr->started && (event != STARTING_EVT))
 621                return;         /* Not yet. */
 622
 623        if (link_blocked(l_ptr)) {
 624                if (event == TIMEOUT_EVT)
 625                        link_set_timer(l_ptr, cont_intv);
 626                return;   /* Changeover going on */
 627        }
 628
 629        switch (l_ptr->state) {
 630        case WORKING_WORKING:
 631                switch (event) {
 632                case TRAFFIC_MSG_EVT:
 633                case ACTIVATE_MSG:
 634                        break;
 635                case TIMEOUT_EVT:
 636                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
 637                                l_ptr->checkpoint = l_ptr->next_in_no;
 638                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
 639                                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 640                                                                 0, 0, 0, 0, 0);
 641                                        l_ptr->fsm_msg_cnt++;
 642                                } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
 643                                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 644                                                                 1, 0, 0, 0, 0);
 645                                        l_ptr->fsm_msg_cnt++;
 646                                }
 647                                link_set_timer(l_ptr, cont_intv);
 648                                break;
 649                        }
 650                        l_ptr->state = WORKING_UNKNOWN;
 651                        l_ptr->fsm_msg_cnt = 0;
 652                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 653                        l_ptr->fsm_msg_cnt++;
 654                        link_set_timer(l_ptr, cont_intv / 4);
 655                        break;
 656                case RESET_MSG:
 657                        info("Resetting link <%s>, requested by peer\n",
 658                             l_ptr->name);
 659                        tipc_link_reset(l_ptr);
 660                        l_ptr->state = RESET_RESET;
 661                        l_ptr->fsm_msg_cnt = 0;
 662                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
 663                        l_ptr->fsm_msg_cnt++;
 664                        link_set_timer(l_ptr, cont_intv);
 665                        break;
 666                default:
 667                        err("Unknown link event %u in WW state\n", event);
 668                }
 669                break;
 670        case WORKING_UNKNOWN:
 671                switch (event) {
 672                case TRAFFIC_MSG_EVT:
 673                case ACTIVATE_MSG:
 674                        l_ptr->state = WORKING_WORKING;
 675                        l_ptr->fsm_msg_cnt = 0;
 676                        link_set_timer(l_ptr, cont_intv);
 677                        break;
 678                case RESET_MSG:
 679                        info("Resetting link <%s>, requested by peer "
 680                             "while probing\n", l_ptr->name);
 681                        tipc_link_reset(l_ptr);
 682                        l_ptr->state = RESET_RESET;
 683                        l_ptr->fsm_msg_cnt = 0;
 684                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
 685                        l_ptr->fsm_msg_cnt++;
 686                        link_set_timer(l_ptr, cont_intv);
 687                        break;
 688                case TIMEOUT_EVT:
 689                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
 690                                l_ptr->state = WORKING_WORKING;
 691                                l_ptr->fsm_msg_cnt = 0;
 692                                l_ptr->checkpoint = l_ptr->next_in_no;
 693                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
 694                                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 695                                                                 0, 0, 0, 0, 0);
 696                                        l_ptr->fsm_msg_cnt++;
 697                                }
 698                                link_set_timer(l_ptr, cont_intv);
 699                        } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
 700                                tipc_link_send_proto_msg(l_ptr, STATE_MSG,
 701                                                         1, 0, 0, 0, 0);
 702                                l_ptr->fsm_msg_cnt++;
 703                                link_set_timer(l_ptr, cont_intv / 4);
 704                        } else {        /* Link has failed */
 705                                warn("Resetting link <%s>, peer not responding\n",
 706                                     l_ptr->name);
 707                                tipc_link_reset(l_ptr);
 708                                l_ptr->state = RESET_UNKNOWN;
 709                                l_ptr->fsm_msg_cnt = 0;
 710                                tipc_link_send_proto_msg(l_ptr, RESET_MSG,
 711                                                         0, 0, 0, 0, 0);
 712                                l_ptr->fsm_msg_cnt++;
 713                                link_set_timer(l_ptr, cont_intv);
 714                        }
 715                        break;
 716                default:
 717                        err("Unknown link event %u in WU state\n", event);
 718                }
 719                break;
 720        case RESET_UNKNOWN:
 721                switch (event) {
 722                case TRAFFIC_MSG_EVT:
 723                        break;
 724                case ACTIVATE_MSG:
 725                        other = l_ptr->owner->active_links[0];
 726                        if (other && link_working_unknown(other))
 727                                break;
 728                        l_ptr->state = WORKING_WORKING;
 729                        l_ptr->fsm_msg_cnt = 0;
 730                        link_activate(l_ptr);
 731                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 732                        l_ptr->fsm_msg_cnt++;
 733                        link_set_timer(l_ptr, cont_intv);
 734                        break;
 735                case RESET_MSG:
 736                        l_ptr->state = RESET_RESET;
 737                        l_ptr->fsm_msg_cnt = 0;
 738                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
 739                        l_ptr->fsm_msg_cnt++;
 740                        link_set_timer(l_ptr, cont_intv);
 741                        break;
 742                case STARTING_EVT:
 743                        l_ptr->started = 1;
 744                        /* fall through */
 745                case TIMEOUT_EVT:
 746                        tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
 747                        l_ptr->fsm_msg_cnt++;
 748                        link_set_timer(l_ptr, cont_intv);
 749                        break;
 750                default:
 751                        err("Unknown link event %u in RU state\n", event);
 752                }
 753                break;
 754        case RESET_RESET:
 755                switch (event) {
 756                case TRAFFIC_MSG_EVT:
 757                case ACTIVATE_MSG:
 758                        other = l_ptr->owner->active_links[0];
 759                        if (other && link_working_unknown(other))
 760                                break;
 761                        l_ptr->state = WORKING_WORKING;
 762                        l_ptr->fsm_msg_cnt = 0;
 763                        link_activate(l_ptr);
 764                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 765                        l_ptr->fsm_msg_cnt++;
 766                        link_set_timer(l_ptr, cont_intv);
 767                        break;
 768                case RESET_MSG:
 769                        break;
 770                case TIMEOUT_EVT:
 771                        tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
 772                        l_ptr->fsm_msg_cnt++;
 773                        link_set_timer(l_ptr, cont_intv);
 774                        break;
 775                default:
 776                        err("Unknown link event %u in RR state\n", event);
 777                }
 778                break;
 779        default:
 780                err("Unknown link state %u/%u\n", l_ptr->state, event);
 781        }
 782}
 783
 784/*
 785 * link_bundle_buf(): Append contents of a buffer to
 786 * the tail of an existing one.
 787 */
 788
 789static int link_bundle_buf(struct tipc_link *l_ptr,
 790                           struct sk_buff *bundler,
 791                           struct sk_buff *buf)
 792{
 793        struct tipc_msg *bundler_msg = buf_msg(bundler);
 794        struct tipc_msg *msg = buf_msg(buf);
 795        u32 size = msg_size(msg);
 796        u32 bundle_size = msg_size(bundler_msg);
 797        u32 to_pos = align(bundle_size);
 798        u32 pad = to_pos - bundle_size;
 799
 800        if (msg_user(bundler_msg) != MSG_BUNDLER)
 801                return 0;
 802        if (msg_type(bundler_msg) != OPEN_MSG)
 803                return 0;
 804        if (skb_tailroom(bundler) < (pad + size))
 805                return 0;
 806        if (l_ptr->max_pkt < (to_pos + size))
 807                return 0;
 808
 809        skb_put(bundler, pad + size);
 810        skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
 811        msg_set_size(bundler_msg, to_pos + size);
 812        msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
 813        buf_discard(buf);
 814        l_ptr->stats.sent_bundled++;
 815        return 1;
 816}
 817
 818static void link_add_to_outqueue(struct tipc_link *l_ptr,
 819                                 struct sk_buff *buf,
 820                                 struct tipc_msg *msg)
 821{
 822        u32 ack = mod(l_ptr->next_in_no - 1);
 823        u32 seqno = mod(l_ptr->next_out_no++);
 824
 825        msg_set_word(msg, 2, ((ack << 16) | seqno));
 826        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
 827        buf->next = NULL;
 828        if (l_ptr->first_out) {
 829                l_ptr->last_out->next = buf;
 830                l_ptr->last_out = buf;
 831        } else
 832                l_ptr->first_out = l_ptr->last_out = buf;
 833
 834        l_ptr->out_queue_size++;
 835        if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
 836                l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
 837}
 838
 839static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
 840                                       struct sk_buff *buf_chain,
 841                                       u32 long_msgno)
 842{
 843        struct sk_buff *buf;
 844        struct tipc_msg *msg;
 845
 846        if (!l_ptr->next_out)
 847                l_ptr->next_out = buf_chain;
 848        while (buf_chain) {
 849                buf = buf_chain;
 850                buf_chain = buf_chain->next;
 851
 852                msg = buf_msg(buf);
 853                msg_set_long_msgno(msg, long_msgno);
 854                link_add_to_outqueue(l_ptr, buf, msg);
 855        }
 856}
 857
 858/*
 859 * tipc_link_send_buf() is the 'full path' for messages, called from
 860 * inside TIPC when the 'fast path' in tipc_send_buf
 861 * has failed, and from link_send()
 862 */
 863
 864int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
 865{
 866        struct tipc_msg *msg = buf_msg(buf);
 867        u32 size = msg_size(msg);
 868        u32 dsz = msg_data_sz(msg);
 869        u32 queue_size = l_ptr->out_queue_size;
 870        u32 imp = tipc_msg_tot_importance(msg);
 871        u32 queue_limit = l_ptr->queue_limit[imp];
 872        u32 max_packet = l_ptr->max_pkt;
 873
 874        msg_set_prevnode(msg, tipc_own_addr);   /* If routed message */
 875
 876        /* Match msg importance against queue limits: */
 877
 878        if (unlikely(queue_size >= queue_limit)) {
 879                if (imp <= TIPC_CRITICAL_IMPORTANCE) {
 880                        link_schedule_port(l_ptr, msg_origport(msg), size);
 881                        buf_discard(buf);
 882                        return -ELINKCONG;
 883                }
 884                buf_discard(buf);
 885                if (imp > CONN_MANAGER) {
 886                        warn("Resetting link <%s>, send queue full", l_ptr->name);
 887                        tipc_link_reset(l_ptr);
 888                }
 889                return dsz;
 890        }
 891
 892        /* Fragmentation needed ? */
 893
 894        if (size > max_packet)
 895                return link_send_long_buf(l_ptr, buf);
 896
 897        /* Packet can be queued or sent: */
 898
 899        if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
 900                   !link_congested(l_ptr))) {
 901                link_add_to_outqueue(l_ptr, buf, msg);
 902
 903                if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
 904                        l_ptr->unacked_window = 0;
 905                } else {
 906                        tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
 907                        l_ptr->stats.bearer_congs++;
 908                        l_ptr->next_out = buf;
 909                }
 910                return dsz;
 911        }
 912        /* Congestion: can message be bundled ?: */
 913
 914        if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
 915            (msg_user(msg) != MSG_FRAGMENTER)) {
 916
 917                /* Try adding message to an existing bundle */
 918
 919                if (l_ptr->next_out &&
 920                    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
 921                        tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
 922                        return dsz;
 923                }
 924
 925                /* Try creating a new bundle */
 926
 927                if (size <= max_packet * 2 / 3) {
 928                        struct sk_buff *bundler = tipc_buf_acquire(max_packet);
 929                        struct tipc_msg bundler_hdr;
 930
 931                        if (bundler) {
 932                                tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
 933                                         INT_H_SIZE, l_ptr->addr);
 934                                skb_copy_to_linear_data(bundler, &bundler_hdr,
 935                                                        INT_H_SIZE);
 936                                skb_trim(bundler, INT_H_SIZE);
 937                                link_bundle_buf(l_ptr, bundler, buf);
 938                                buf = bundler;
 939                                msg = buf_msg(buf);
 940                                l_ptr->stats.sent_bundles++;
 941                        }
 942                }
 943        }
 944        if (!l_ptr->next_out)
 945                l_ptr->next_out = buf;
 946        link_add_to_outqueue(l_ptr, buf, msg);
 947        tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
 948        return dsz;
 949}
 950
 951/*
 952 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
 953 * not been selected yet, and the the owner node is not locked
 954 * Called by TIPC internal users, e.g. the name distributor
 955 */
 956
 957int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
 958{
 959        struct tipc_link *l_ptr;
 960        struct tipc_node *n_ptr;
 961        int res = -ELINKCONG;
 962
 963        read_lock_bh(&tipc_net_lock);
 964        n_ptr = tipc_node_find(dest);
 965        if (n_ptr) {
 966                tipc_node_lock(n_ptr);
 967                l_ptr = n_ptr->active_links[selector & 1];
 968                if (l_ptr)
 969                        res = tipc_link_send_buf(l_ptr, buf);
 970                else
 971                        buf_discard(buf);
 972                tipc_node_unlock(n_ptr);
 973        } else {
 974                buf_discard(buf);
 975        }
 976        read_unlock_bh(&tipc_net_lock);
 977        return res;
 978}
 979
 980/*
 981 * tipc_link_send_names - send name table entries to new neighbor
 982 *
 983 * Send routine for bulk delivery of name table messages when contact
 984 * with a new neighbor occurs. No link congestion checking is performed
 985 * because name table messages *must* be delivered. The messages must be
 986 * small enough not to require fragmentation.
 987 * Called without any locks held.
 988 */
 989
 990void tipc_link_send_names(struct list_head *message_list, u32 dest)
 991{
 992        struct tipc_node *n_ptr;
 993        struct tipc_link *l_ptr;
 994        struct sk_buff *buf;
 995        struct sk_buff *temp_buf;
 996
 997        if (list_empty(message_list))
 998                return;
 999
1000        read_lock_bh(&tipc_net_lock);
1001        n_ptr = tipc_node_find(dest);
1002        if (n_ptr) {
1003                tipc_node_lock(n_ptr);
1004                l_ptr = n_ptr->active_links[0];
1005                if (l_ptr) {
1006                        /* convert circular list to linear list */
1007                        ((struct sk_buff *)message_list->prev)->next = NULL;
1008                        link_add_chain_to_outqueue(l_ptr,
1009                                (struct sk_buff *)message_list->next, 0);
1010                        tipc_link_push_queue(l_ptr);
1011                        INIT_LIST_HEAD(message_list);
1012                }
1013                tipc_node_unlock(n_ptr);
1014        }
1015        read_unlock_bh(&tipc_net_lock);
1016
1017        /* discard the messages if they couldn't be sent */
1018
1019        list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
1020                list_del((struct list_head *)buf);
1021                buf_discard(buf);
1022        }
1023}
1024
1025/*
1026 * link_send_buf_fast: Entry for data messages where the
1027 * destination link is known and the header is complete,
1028 * inclusive total message length. Very time critical.
1029 * Link is locked. Returns user data length.
1030 */
1031
1032static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
1033                              u32 *used_max_pkt)
1034{
1035        struct tipc_msg *msg = buf_msg(buf);
1036        int res = msg_data_sz(msg);
1037
1038        if (likely(!link_congested(l_ptr))) {
1039                if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1040                        if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1041                                link_add_to_outqueue(l_ptr, buf, msg);
1042                                if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
1043                                                            &l_ptr->media_addr))) {
1044                                        l_ptr->unacked_window = 0;
1045                                        return res;
1046                                }
1047                                tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1048                                l_ptr->stats.bearer_congs++;
1049                                l_ptr->next_out = buf;
1050                                return res;
1051                        }
1052                } else
1053                        *used_max_pkt = l_ptr->max_pkt;
1054        }
1055        return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1056}
1057
1058/*
1059 * tipc_send_buf_fast: Entry for data messages where the
1060 * destination node is known and the header is complete,
1061 * inclusive total message length.
1062 * Returns user data length.
1063 */
1064int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1065{
1066        struct tipc_link *l_ptr;
1067        struct tipc_node *n_ptr;
1068        int res;
1069        u32 selector = msg_origport(buf_msg(buf)) & 1;
1070        u32 dummy;
1071
1072        read_lock_bh(&tipc_net_lock);
1073        n_ptr = tipc_node_find(destnode);
1074        if (likely(n_ptr)) {
1075                tipc_node_lock(n_ptr);
1076                l_ptr = n_ptr->active_links[selector];
1077                if (likely(l_ptr)) {
1078                        res = link_send_buf_fast(l_ptr, buf, &dummy);
1079                        tipc_node_unlock(n_ptr);
1080                        read_unlock_bh(&tipc_net_lock);
1081                        return res;
1082                }
1083                tipc_node_unlock(n_ptr);
1084        }
1085        read_unlock_bh(&tipc_net_lock);
1086        res = msg_data_sz(buf_msg(buf));
1087        tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1088        return res;
1089}
1090
1091
1092/*
1093 * tipc_link_send_sections_fast: Entry for messages where the
1094 * destination processor is known and the header is complete,
1095 * except for total message length.
1096 * Returns user data length or errno.
1097 */
1098int tipc_link_send_sections_fast(struct tipc_port *sender,
1099                                 struct iovec const *msg_sect,
1100                                 const u32 num_sect,
1101                                 unsigned int total_len,
1102                                 u32 destaddr)
1103{
1104        struct tipc_msg *hdr = &sender->phdr;
1105        struct tipc_link *l_ptr;
1106        struct sk_buff *buf;
1107        struct tipc_node *node;
1108        int res;
1109        u32 selector = msg_origport(hdr) & 1;
1110
1111again:
1112        /*
1113         * Try building message using port's max_pkt hint.
1114         * (Must not hold any locks while building message.)
1115         */
1116
1117        res = tipc_msg_build(hdr, msg_sect, num_sect, total_len,
1118                             sender->max_pkt, !sender->user_port, &buf);
1119
1120        read_lock_bh(&tipc_net_lock);
1121        node = tipc_node_find(destaddr);
1122        if (likely(node)) {
1123                tipc_node_lock(node);
1124                l_ptr = node->active_links[selector];
1125                if (likely(l_ptr)) {
1126                        if (likely(buf)) {
1127                                res = link_send_buf_fast(l_ptr, buf,
1128                                                         &sender->max_pkt);
1129exit:
1130                                tipc_node_unlock(node);
1131                                read_unlock_bh(&tipc_net_lock);
1132                                return res;
1133                        }
1134
1135                        /* Exit if build request was invalid */
1136
1137                        if (unlikely(res < 0))
1138                                goto exit;
1139
1140                        /* Exit if link (or bearer) is congested */
1141
1142                        if (link_congested(l_ptr) ||
1143                            !list_empty(&l_ptr->b_ptr->cong_links)) {
1144                                res = link_schedule_port(l_ptr,
1145                                                         sender->ref, res);
1146                                goto exit;
1147                        }
1148
1149                        /*
1150                         * Message size exceeds max_pkt hint; update hint,
1151                         * then re-try fast path or fragment the message
1152                         */
1153
1154                        sender->max_pkt = l_ptr->max_pkt;
1155                        tipc_node_unlock(node);
1156                        read_unlock_bh(&tipc_net_lock);
1157
1158
1159                        if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1160                                goto again;
1161
1162                        return link_send_sections_long(sender, msg_sect,
1163                                                       num_sect, total_len,
1164                                                       destaddr);
1165                }
1166                tipc_node_unlock(node);
1167        }
1168        read_unlock_bh(&tipc_net_lock);
1169
1170        /* Couldn't find a link to the destination node */
1171
1172        if (buf)
1173                return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1174        if (res >= 0)
1175                return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1176                                                 total_len, TIPC_ERR_NO_NODE);
1177        return res;
1178}
1179
1180/*
1181 * link_send_sections_long(): Entry for long messages where the
1182 * destination node is known and the header is complete,
1183 * inclusive total message length.
1184 * Link and bearer congestion status have been checked to be ok,
1185 * and are ignored if they change.
1186 *
1187 * Note that fragments do not use the full link MTU so that they won't have
1188 * to undergo refragmentation if link changeover causes them to be sent
1189 * over another link with an additional tunnel header added as prefix.
1190 * (Refragmentation will still occur if the other link has a smaller MTU.)
1191 *
1192 * Returns user data length or errno.
1193 */
1194static int link_send_sections_long(struct tipc_port *sender,
1195                                   struct iovec const *msg_sect,
1196                                   u32 num_sect,
1197                                   unsigned int total_len,
1198                                   u32 destaddr)
1199{
1200        struct tipc_link *l_ptr;
1201        struct tipc_node *node;
1202        struct tipc_msg *hdr = &sender->phdr;
1203        u32 dsz = total_len;
1204        u32 max_pkt, fragm_sz, rest;
1205        struct tipc_msg fragm_hdr;
1206        struct sk_buff *buf, *buf_chain, *prev;
1207        u32 fragm_crs, fragm_rest, hsz, sect_rest;
1208        const unchar *sect_crs;
1209        int curr_sect;
1210        u32 fragm_no;
1211
1212again:
1213        fragm_no = 1;
1214        max_pkt = sender->max_pkt - INT_H_SIZE;
1215                /* leave room for tunnel header in case of link changeover */
1216        fragm_sz = max_pkt - INT_H_SIZE;
1217                /* leave room for fragmentation header in each fragment */
1218        rest = dsz;
1219        fragm_crs = 0;
1220        fragm_rest = 0;
1221        sect_rest = 0;
1222        sect_crs = NULL;
1223        curr_sect = -1;
1224
1225        /* Prepare reusable fragment header: */
1226
1227        tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1228                 INT_H_SIZE, msg_destnode(hdr));
1229        msg_set_size(&fragm_hdr, max_pkt);
1230        msg_set_fragm_no(&fragm_hdr, 1);
1231
1232        /* Prepare header of first fragment: */
1233
1234        buf_chain = buf = tipc_buf_acquire(max_pkt);
1235        if (!buf)
1236                return -ENOMEM;
1237        buf->next = NULL;
1238        skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1239        hsz = msg_hdr_sz(hdr);
1240        skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1241
1242        /* Chop up message: */
1243
1244        fragm_crs = INT_H_SIZE + hsz;
1245        fragm_rest = fragm_sz - hsz;
1246
1247        do {            /* For all sections */
1248                u32 sz;
1249
1250                if (!sect_rest) {
1251                        sect_rest = msg_sect[++curr_sect].iov_len;
1252                        sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1253                }
1254
1255                if (sect_rest < fragm_rest)
1256                        sz = sect_rest;
1257                else
1258                        sz = fragm_rest;
1259
1260                if (likely(!sender->user_port)) {
1261                        if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1262error:
1263                                for (; buf_chain; buf_chain = buf) {
1264                                        buf = buf_chain->next;
1265                                        buf_discard(buf_chain);
1266                                }
1267                                return -EFAULT;
1268                        }
1269                } else
1270                        skb_copy_to_linear_data_offset(buf, fragm_crs,
1271                                                       sect_crs, sz);
1272                sect_crs += sz;
1273                sect_rest -= sz;
1274                fragm_crs += sz;
1275                fragm_rest -= sz;
1276                rest -= sz;
1277
1278                if (!fragm_rest && rest) {
1279
1280                        /* Initiate new fragment: */
1281                        if (rest <= fragm_sz) {
1282                                fragm_sz = rest;
1283                                msg_set_type(&fragm_hdr, LAST_FRAGMENT);
1284                        } else {
1285                                msg_set_type(&fragm_hdr, FRAGMENT);
1286                        }
1287                        msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1288                        msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1289                        prev = buf;
1290                        buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1291                        if (!buf)
1292                                goto error;
1293
1294                        buf->next = NULL;
1295                        prev->next = buf;
1296                        skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1297                        fragm_crs = INT_H_SIZE;
1298                        fragm_rest = fragm_sz;
1299                }
1300        } while (rest > 0);
1301
1302        /*
1303         * Now we have a buffer chain. Select a link and check
1304         * that packet size is still OK
1305         */
1306        node = tipc_node_find(destaddr);
1307        if (likely(node)) {
1308                tipc_node_lock(node);
1309                l_ptr = node->active_links[sender->ref & 1];
1310                if (!l_ptr) {
1311                        tipc_node_unlock(node);
1312                        goto reject;
1313                }
1314                if (l_ptr->max_pkt < max_pkt) {
1315                        sender->max_pkt = l_ptr->max_pkt;
1316                        tipc_node_unlock(node);
1317                        for (; buf_chain; buf_chain = buf) {
1318                                buf = buf_chain->next;
1319                                buf_discard(buf_chain);
1320                        }
1321                        goto again;
1322                }
1323        } else {
1324reject:
1325                for (; buf_chain; buf_chain = buf) {
1326                        buf = buf_chain->next;
1327                        buf_discard(buf_chain);
1328                }
1329                return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1330                                                 total_len, TIPC_ERR_NO_NODE);
1331        }
1332
1333        /* Append chain of fragments to send queue & send them */
1334
1335        l_ptr->long_msg_seq_no++;
1336        link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
1337        l_ptr->stats.sent_fragments += fragm_no;
1338        l_ptr->stats.sent_fragmented++;
1339        tipc_link_push_queue(l_ptr);
1340        tipc_node_unlock(node);
1341        return dsz;
1342}
1343
1344/*
1345 * tipc_link_push_packet: Push one unsent packet to the media
1346 */
1347u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1348{
1349        struct sk_buff *buf = l_ptr->first_out;
1350        u32 r_q_size = l_ptr->retransm_queue_size;
1351        u32 r_q_head = l_ptr->retransm_queue_head;
1352
1353        /* Step to position where retransmission failed, if any,    */
1354        /* consider that buffers may have been released in meantime */
1355
1356        if (r_q_size && buf) {
1357                u32 last = lesser(mod(r_q_head + r_q_size),
1358                                  link_last_sent(l_ptr));
1359                u32 first = buf_seqno(buf);
1360
1361                while (buf && less(first, r_q_head)) {
1362                        first = mod(first + 1);
1363                        buf = buf->next;
1364                }
1365                l_ptr->retransm_queue_head = r_q_head = first;
1366                l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1367        }
1368
1369        /* Continue retransmission now, if there is anything: */
1370
1371        if (r_q_size && buf) {
1372                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1373                msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1374                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1375                        l_ptr->retransm_queue_head = mod(++r_q_head);
1376                        l_ptr->retransm_queue_size = --r_q_size;
1377                        l_ptr->stats.retransmitted++;
1378                        return 0;
1379                } else {
1380                        l_ptr->stats.bearer_congs++;
1381                        return PUSH_FAILED;
1382                }
1383        }
1384
1385        /* Send deferred protocol message, if any: */
1386
1387        buf = l_ptr->proto_msg_queue;
1388        if (buf) {
1389                msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1390                msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1391                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1392                        l_ptr->unacked_window = 0;
1393                        buf_discard(buf);
1394                        l_ptr->proto_msg_queue = NULL;
1395                        return 0;
1396                } else {
1397                        l_ptr->stats.bearer_congs++;
1398                        return PUSH_FAILED;
1399                }
1400        }
1401
1402        /* Send one deferred data message, if send window not full: */
1403
1404        buf = l_ptr->next_out;
1405        if (buf) {
1406                struct tipc_msg *msg = buf_msg(buf);
1407                u32 next = msg_seqno(msg);
1408                u32 first = buf_seqno(l_ptr->first_out);
1409
1410                if (mod(next - first) < l_ptr->queue_limit[0]) {
1411                        msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1412                        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1413                        if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1414                                if (msg_user(msg) == MSG_BUNDLER)
1415                                        msg_set_type(msg, CLOSED_MSG);
1416                                l_ptr->next_out = buf->next;
1417                                return 0;
1418                        } else {
1419                                l_ptr->stats.bearer_congs++;
1420                                return PUSH_FAILED;
1421                        }
1422                }
1423        }
1424        return PUSH_FINISHED;
1425}
1426
1427/*
1428 * push_queue(): push out the unsent messages of a link where
1429 *               congestion has abated. Node is locked
1430 */
1431void tipc_link_push_queue(struct tipc_link *l_ptr)
1432{
1433        u32 res;
1434
1435        if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1436                return;
1437
1438        do {
1439                res = tipc_link_push_packet(l_ptr);
1440        } while (!res);
1441
1442        if (res == PUSH_FAILED)
1443                tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1444}
1445
1446static void link_reset_all(unsigned long addr)
1447{
1448        struct tipc_node *n_ptr;
1449        char addr_string[16];
1450        u32 i;
1451
1452        read_lock_bh(&tipc_net_lock);
1453        n_ptr = tipc_node_find((u32)addr);
1454        if (!n_ptr) {
1455                read_unlock_bh(&tipc_net_lock);
1456                return; /* node no longer exists */
1457        }
1458
1459        tipc_node_lock(n_ptr);
1460
1461        warn("Resetting all links to %s\n",
1462             tipc_addr_string_fill(addr_string, n_ptr->addr));
1463
1464        for (i = 0; i < MAX_BEARERS; i++) {
1465                if (n_ptr->links[i]) {
1466                        link_print(n_ptr->links[i], "Resetting link\n");
1467                        tipc_link_reset(n_ptr->links[i]);
1468                }
1469        }
1470
1471        tipc_node_unlock(n_ptr);
1472        read_unlock_bh(&tipc_net_lock);
1473}
1474
1475static void link_retransmit_failure(struct tipc_link *l_ptr,
1476                                        struct sk_buff *buf)
1477{
1478        struct tipc_msg *msg = buf_msg(buf);
1479
1480        warn("Retransmission failure on link <%s>\n", l_ptr->name);
1481
1482        if (l_ptr->addr) {
1483
1484                /* Handle failure on standard link */
1485
1486                link_print(l_ptr, "Resetting link\n");
1487                tipc_link_reset(l_ptr);
1488
1489        } else {
1490
1491                /* Handle failure on broadcast link */
1492
1493                struct tipc_node *n_ptr;
1494                char addr_string[16];
1495
1496                info("Msg seq number: %u,  ", msg_seqno(msg));
1497                info("Outstanding acks: %lu\n",
1498                     (unsigned long) TIPC_SKB_CB(buf)->handle);
1499
1500                n_ptr = tipc_bclink_retransmit_to();
1501                tipc_node_lock(n_ptr);
1502
1503                tipc_addr_string_fill(addr_string, n_ptr->addr);
1504                info("Multicast link info for %s\n", addr_string);
1505                info("Supported: %d,  ", n_ptr->bclink.supported);
1506                info("Acked: %u\n", n_ptr->bclink.acked);
1507                info("Last in: %u,  ", n_ptr->bclink.last_in);
1508                info("Gap after: %u,  ", n_ptr->bclink.gap_after);
1509                info("Gap to: %u\n", n_ptr->bclink.gap_to);
1510                info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1511
1512                tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1513
1514                tipc_node_unlock(n_ptr);
1515
1516                l_ptr->stale_count = 0;
1517        }
1518}
1519
1520void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1521                          u32 retransmits)
1522{
1523        struct tipc_msg *msg;
1524
1525        if (!buf)
1526                return;
1527
1528        msg = buf_msg(buf);
1529
1530        if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1531                if (l_ptr->retransm_queue_size == 0) {
1532                        l_ptr->retransm_queue_head = msg_seqno(msg);
1533                        l_ptr->retransm_queue_size = retransmits;
1534                } else {
1535                        err("Unexpected retransmit on link %s (qsize=%d)\n",
1536                            l_ptr->name, l_ptr->retransm_queue_size);
1537                }
1538                return;
1539        } else {
1540                /* Detect repeated retransmit failures on uncongested bearer */
1541
1542                if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1543                        if (++l_ptr->stale_count > 100) {
1544                                link_retransmit_failure(l_ptr, buf);
1545                                return;
1546                        }
1547                } else {
1548                        l_ptr->last_retransmitted = msg_seqno(msg);
1549                        l_ptr->stale_count = 1;
1550                }
1551        }
1552
1553        while (retransmits && (buf != l_ptr->next_out) && buf) {
1554                msg = buf_msg(buf);
1555                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1556                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1557                if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1558                        buf = buf->next;
1559                        retransmits--;
1560                        l_ptr->stats.retransmitted++;
1561                } else {
1562                        tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1563                        l_ptr->stats.bearer_congs++;
1564                        l_ptr->retransm_queue_head = buf_seqno(buf);
1565                        l_ptr->retransm_queue_size = retransmits;
1566                        return;
1567                }
1568        }
1569
1570        l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1571}
1572
1573/**
1574 * link_insert_deferred_queue - insert deferred messages back into receive chain
1575 */
1576
1577static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
1578                                                  struct sk_buff *buf)
1579{
1580        u32 seq_no;
1581
1582        if (l_ptr->oldest_deferred_in == NULL)
1583                return buf;
1584
1585        seq_no = buf_seqno(l_ptr->oldest_deferred_in);
1586        if (seq_no == mod(l_ptr->next_in_no)) {
1587                l_ptr->newest_deferred_in->next = buf;
1588                buf = l_ptr->oldest_deferred_in;
1589                l_ptr->oldest_deferred_in = NULL;
1590                l_ptr->deferred_inqueue_sz = 0;
1591        }
1592        return buf;
1593}
1594
1595/**
1596 * link_recv_buf_validate - validate basic format of received message
1597 *
1598 * This routine ensures a TIPC message has an acceptable header, and at least
1599 * as much data as the header indicates it should.  The routine also ensures
1600 * that the entire message header is stored in the main fragment of the message
1601 * buffer, to simplify future access to message header fields.
1602 *
1603 * Note: Having extra info present in the message header or data areas is OK.
1604 * TIPC will ignore the excess, under the assumption that it is optional info
1605 * introduced by a later release of the protocol.
1606 */
1607
1608static int link_recv_buf_validate(struct sk_buff *buf)
1609{
1610        static u32 min_data_hdr_size[8] = {
1611                SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
1612                MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1613                };
1614
1615        struct tipc_msg *msg;
1616        u32 tipc_hdr[2];
1617        u32 size;
1618        u32 hdr_size;
1619        u32 min_hdr_size;
1620
1621        if (unlikely(buf->len < MIN_H_SIZE))
1622                return 0;
1623
1624        msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1625        if (msg == NULL)
1626                return 0;
1627
1628        if (unlikely(msg_version(msg) != TIPC_VERSION))
1629                return 0;
1630
1631        size = msg_size(msg);
1632        hdr_size = msg_hdr_sz(msg);
1633        min_hdr_size = msg_isdata(msg) ?
1634                min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1635
1636        if (unlikely((hdr_size < min_hdr_size) ||
1637                     (size < hdr_size) ||
1638                     (buf->len < size) ||
1639                     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1640                return 0;
1641
1642        return pskb_may_pull(buf, hdr_size);
1643}
1644
1645/**
1646 * tipc_recv_msg - process TIPC messages arriving from off-node
1647 * @head: pointer to message buffer chain
1648 * @tb_ptr: pointer to bearer message arrived on
1649 *
1650 * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1651 * structure (i.e. cannot be NULL), but bearer can be inactive.
1652 */
1653
1654void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
1655{
1656        read_lock_bh(&tipc_net_lock);
1657        while (head) {
1658                struct tipc_node *n_ptr;
1659                struct tipc_link *l_ptr;
1660                struct sk_buff *crs;
1661                struct sk_buff *buf = head;
1662                struct tipc_msg *msg;
1663                u32 seq_no;
1664                u32 ackd;
1665                u32 released = 0;
1666                int type;
1667
1668                head = head->next;
1669
1670                /* Ensure bearer is still enabled */
1671
1672                if (unlikely(!b_ptr->active))
1673                        goto cont;
1674
1675                /* Ensure message is well-formed */
1676
1677                if (unlikely(!link_recv_buf_validate(buf)))
1678                        goto cont;
1679
1680                /* Ensure message data is a single contiguous unit */
1681
1682                if (unlikely(buf_linearize(buf)))
1683                        goto cont;
1684
1685                /* Handle arrival of a non-unicast link message */
1686
1687                msg = buf_msg(buf);
1688
1689                if (unlikely(msg_non_seq(msg))) {
1690                        if (msg_user(msg) ==  LINK_CONFIG)
1691                                tipc_disc_recv_msg(buf, b_ptr);
1692                        else
1693                                tipc_bclink_recv_pkt(buf);
1694                        continue;
1695                }
1696
1697                /* Discard unicast link messages destined for another node */
1698
1699                if (unlikely(!msg_short(msg) &&
1700                             (msg_destnode(msg) != tipc_own_addr)))
1701                        goto cont;
1702
1703                /* Locate neighboring node that sent message */
1704
1705                n_ptr = tipc_node_find(msg_prevnode(msg));
1706                if (unlikely(!n_ptr))
1707                        goto cont;
1708                tipc_node_lock(n_ptr);
1709
1710                /* Locate unicast link endpoint that should handle message */
1711
1712                l_ptr = n_ptr->links[b_ptr->identity];
1713                if (unlikely(!l_ptr)) {
1714                        tipc_node_unlock(n_ptr);
1715                        goto cont;
1716                }
1717
1718                /* Verify that communication with node is currently allowed */
1719
1720                if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
1721                        msg_user(msg) == LINK_PROTOCOL &&
1722                        (msg_type(msg) == RESET_MSG ||
1723                                        msg_type(msg) == ACTIVATE_MSG) &&
1724                        !msg_redundant_link(msg))
1725                        n_ptr->block_setup &= ~WAIT_PEER_DOWN;
1726
1727                if (n_ptr->block_setup) {
1728                        tipc_node_unlock(n_ptr);
1729                        goto cont;
1730                }
1731
1732                /* Validate message sequence number info */
1733
1734                seq_no = msg_seqno(msg);
1735                ackd = msg_ack(msg);
1736
1737                /* Release acked messages */
1738
1739                if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1740                        tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1741
1742                crs = l_ptr->first_out;
1743                while ((crs != l_ptr->next_out) &&
1744                       less_eq(buf_seqno(crs), ackd)) {
1745                        struct sk_buff *next = crs->next;
1746
1747                        buf_discard(crs);
1748                        crs = next;
1749                        released++;
1750                }
1751                if (released) {
1752                        l_ptr->first_out = crs;
1753                        l_ptr->out_queue_size -= released;
1754                }
1755
1756                /* Try sending any messages link endpoint has pending */
1757
1758                if (unlikely(l_ptr->next_out))
1759                        tipc_link_push_queue(l_ptr);
1760                if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1761                        tipc_link_wakeup_ports(l_ptr, 0);
1762                if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1763                        l_ptr->stats.sent_acks++;
1764                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1765                }
1766
1767                /* Now (finally!) process the incoming message */
1768
1769protocol_check:
1770                if (likely(link_working_working(l_ptr))) {
1771                        if (likely(seq_no == mod(l_ptr->next_in_no))) {
1772                                l_ptr->next_in_no++;
1773                                if (unlikely(l_ptr->oldest_deferred_in))
1774                                        head = link_insert_deferred_queue(l_ptr,
1775                                                                          head);
1776                                if (likely(msg_is_dest(msg, tipc_own_addr))) {
1777deliver:
1778                                        if (likely(msg_isdata(msg))) {
1779                                                tipc_node_unlock(n_ptr);
1780                                                tipc_port_recv_msg(buf);
1781                                                continue;
1782                                        }
1783                                        switch (msg_user(msg)) {
1784                                        case MSG_BUNDLER:
1785                                                l_ptr->stats.recv_bundles++;
1786                                                l_ptr->stats.recv_bundled +=
1787                                                        msg_msgcnt(msg);
1788                                                tipc_node_unlock(n_ptr);
1789                                                tipc_link_recv_bundle(buf);
1790                                                continue;
1791                                        case NAME_DISTRIBUTOR:
1792                                                tipc_node_unlock(n_ptr);
1793                                                tipc_named_recv(buf);
1794                                                continue;
1795                                        case CONN_MANAGER:
1796                                                tipc_node_unlock(n_ptr);
1797                                                tipc_port_recv_proto_msg(buf);
1798                                                continue;
1799                                        case MSG_FRAGMENTER:
1800                                                l_ptr->stats.recv_fragments++;
1801                                                if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1802                                                                            &buf, &msg)) {
1803                                                        l_ptr->stats.recv_fragmented++;
1804                                                        goto deliver;
1805                                                }
1806                                                break;
1807                                        case CHANGEOVER_PROTOCOL:
1808                                                type = msg_type(msg);
1809                                                if (link_recv_changeover_msg(&l_ptr, &buf)) {
1810                                                        msg = buf_msg(buf);
1811                                                        seq_no = msg_seqno(msg);
1812                                                        if (type == ORIGINAL_MSG)
1813                                                                goto deliver;
1814                                                        goto protocol_check;
1815                                                }
1816                                                break;
1817                                        default:
1818                                                buf_discard(buf);
1819                                                buf = NULL;
1820                                                break;
1821                                        }
1822                                }
1823                                tipc_node_unlock(n_ptr);
1824                                tipc_net_route_msg(buf);
1825                                continue;
1826                        }
1827                        link_handle_out_of_seq_msg(l_ptr, buf);
1828                        head = link_insert_deferred_queue(l_ptr, head);
1829                        tipc_node_unlock(n_ptr);
1830                        continue;
1831                }
1832
1833                if (msg_user(msg) == LINK_PROTOCOL) {
1834                        link_recv_proto_msg(l_ptr, buf);
1835                        head = link_insert_deferred_queue(l_ptr, head);
1836                        tipc_node_unlock(n_ptr);
1837                        continue;
1838                }
1839                link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1840
1841                if (link_working_working(l_ptr)) {
1842                        /* Re-insert in front of queue */
1843                        buf->next = head;
1844                        head = buf;
1845                        tipc_node_unlock(n_ptr);
1846                        continue;
1847                }
1848                tipc_node_unlock(n_ptr);
1849cont:
1850                buf_discard(buf);
1851        }
1852        read_unlock_bh(&tipc_net_lock);
1853}
1854
1855/*
1856 * link_defer_buf(): Sort a received out-of-sequence packet
1857 *                   into the deferred reception queue.
1858 * Returns the increase of the queue length,i.e. 0 or 1
1859 */
1860
1861u32 tipc_link_defer_pkt(struct sk_buff **head,
1862                        struct sk_buff **tail,
1863                        struct sk_buff *buf)
1864{
1865        struct sk_buff *prev = NULL;
1866        struct sk_buff *crs = *head;
1867        u32 seq_no = buf_seqno(buf);
1868
1869        buf->next = NULL;
1870
1871        /* Empty queue ? */
1872        if (*head == NULL) {
1873                *head = *tail = buf;
1874                return 1;
1875        }
1876
1877        /* Last ? */
1878        if (less(buf_seqno(*tail), seq_no)) {
1879                (*tail)->next = buf;
1880                *tail = buf;
1881                return 1;
1882        }
1883
1884        /* Scan through queue and sort it in */
1885        do {
1886                struct tipc_msg *msg = buf_msg(crs);
1887
1888                if (less(seq_no, msg_seqno(msg))) {
1889                        buf->next = crs;
1890                        if (prev)
1891                                prev->next = buf;
1892                        else
1893                                *head = buf;
1894                        return 1;
1895                }
1896                if (seq_no == msg_seqno(msg))
1897                        break;
1898                prev = crs;
1899                crs = crs->next;
1900        } while (crs);
1901
1902        /* Message is a duplicate of an existing message */
1903
1904        buf_discard(buf);
1905        return 0;
1906}
1907
1908/**
1909 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1910 */
1911
1912static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1913                                       struct sk_buff *buf)
1914{
1915        u32 seq_no = buf_seqno(buf);
1916
1917        if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1918                link_recv_proto_msg(l_ptr, buf);
1919                return;
1920        }
1921
1922        /* Record OOS packet arrival (force mismatch on next timeout) */
1923
1924        l_ptr->checkpoint--;
1925
1926        /*
1927         * Discard packet if a duplicate; otherwise add it to deferred queue
1928         * and notify peer of gap as per protocol specification
1929         */
1930
1931        if (less(seq_no, mod(l_ptr->next_in_no))) {
1932                l_ptr->stats.duplicates++;
1933                buf_discard(buf);
1934                return;
1935        }
1936
1937        if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
1938                                &l_ptr->newest_deferred_in, buf)) {
1939                l_ptr->deferred_inqueue_sz++;
1940                l_ptr->stats.deferred_recv++;
1941                if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1942                        tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1943        } else
1944                l_ptr->stats.duplicates++;
1945}
1946
1947/*
1948 * Send protocol message to the other endpoint.
1949 */
1950void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1951                                int probe_msg, u32 gap, u32 tolerance,
1952                                u32 priority, u32 ack_mtu)
1953{
1954        struct sk_buff *buf = NULL;
1955        struct tipc_msg *msg = l_ptr->pmsg;
1956        u32 msg_size = sizeof(l_ptr->proto_msg);
1957        int r_flag;
1958
1959        if (link_blocked(l_ptr))
1960                return;
1961
1962        /* Abort non-RESET send if communication with node is prohibited */
1963
1964        if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
1965                return;
1966
1967        msg_set_type(msg, msg_typ);
1968        msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
1969        msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
1970        msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
1971
1972        if (msg_typ == STATE_MSG) {
1973                u32 next_sent = mod(l_ptr->next_out_no);
1974
1975                if (!tipc_link_is_up(l_ptr))
1976                        return;
1977                if (l_ptr->next_out)
1978                        next_sent = buf_seqno(l_ptr->next_out);
1979                msg_set_next_sent(msg, next_sent);
1980                if (l_ptr->oldest_deferred_in) {
1981                        u32 rec = buf_seqno(l_ptr->oldest_deferred_in);
1982                        gap = mod(rec - mod(l_ptr->next_in_no));
1983                }
1984                msg_set_seq_gap(msg, gap);
1985                if (gap)
1986                        l_ptr->stats.sent_nacks++;
1987                msg_set_link_tolerance(msg, tolerance);
1988                msg_set_linkprio(msg, priority);
1989                msg_set_max_pkt(msg, ack_mtu);
1990                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1991                msg_set_probe(msg, probe_msg != 0);
1992                if (probe_msg) {
1993                        u32 mtu = l_ptr->max_pkt;
1994
1995                        if ((mtu < l_ptr->max_pkt_target) &&
1996                            link_working_working(l_ptr) &&
1997                            l_ptr->fsm_msg_cnt) {
1998                                msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1999                                if (l_ptr->max_pkt_probes == 10) {
2000                                        l_ptr->max_pkt_target = (msg_size - 4);
2001                                        l_ptr->max_pkt_probes = 0;
2002                                        msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2003                                }
2004                                l_ptr->max_pkt_probes++;
2005                        }
2006
2007                        l_ptr->stats.sent_probes++;
2008                }
2009                l_ptr->stats.sent_states++;
2010        } else {                /* RESET_MSG or ACTIVATE_MSG */
2011                msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2012                msg_set_seq_gap(msg, 0);
2013                msg_set_next_sent(msg, 1);
2014                msg_set_probe(msg, 0);
2015                msg_set_link_tolerance(msg, l_ptr->tolerance);
2016                msg_set_linkprio(msg, l_ptr->priority);
2017                msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2018        }
2019
2020        r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
2021        msg_set_redundant_link(msg, r_flag);
2022        msg_set_linkprio(msg, l_ptr->priority);
2023
2024        /* Ensure sequence number will not fit : */
2025
2026        msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2027
2028        /* Congestion? */
2029
2030        if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2031                if (!l_ptr->proto_msg_queue) {
2032                        l_ptr->proto_msg_queue =
2033                                tipc_buf_acquire(sizeof(l_ptr->proto_msg));
2034                }
2035                buf = l_ptr->proto_msg_queue;
2036                if (!buf)
2037                        return;
2038                skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2039                return;
2040        }
2041
2042        /* Message can be sent */
2043
2044        buf = tipc_buf_acquire(msg_size);
2045        if (!buf)
2046                return;
2047
2048        skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2049        msg_set_size(buf_msg(buf), msg_size);
2050
2051        if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2052                l_ptr->unacked_window = 0;
2053                buf_discard(buf);
2054                return;
2055        }
2056
2057        /* New congestion */
2058        tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2059        l_ptr->proto_msg_queue = buf;
2060        l_ptr->stats.bearer_congs++;
2061}
2062
2063/*
2064 * Receive protocol message :
2065 * Note that network plane id propagates through the network, and may
2066 * change at any time. The node with lowest address rules
2067 */
2068
2069static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
2070{
2071        u32 rec_gap = 0;
2072        u32 max_pkt_info;
2073        u32 max_pkt_ack;
2074        u32 msg_tol;
2075        struct tipc_msg *msg = buf_msg(buf);
2076
2077        if (link_blocked(l_ptr))
2078                goto exit;
2079
2080        /* record unnumbered packet arrival (force mismatch on next timeout) */
2081
2082        l_ptr->checkpoint--;
2083
2084        if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2085                if (tipc_own_addr > msg_prevnode(msg))
2086                        l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2087
2088        l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2089
2090        switch (msg_type(msg)) {
2091
2092        case RESET_MSG:
2093                if (!link_working_unknown(l_ptr) &&
2094                    (l_ptr->peer_session != INVALID_SESSION)) {
2095                        if (less_eq(msg_session(msg), l_ptr->peer_session))
2096                                break; /* duplicate or old reset: ignore */
2097                }
2098
2099                if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
2100                                link_working_unknown(l_ptr))) {
2101                        /*
2102                         * peer has lost contact -- don't allow peer's links
2103                         * to reactivate before we recognize loss & clean up
2104                         */
2105                        l_ptr->owner->block_setup = WAIT_NODE_DOWN;
2106                }
2107
2108                /* fall thru' */
2109        case ACTIVATE_MSG:
2110                /* Update link settings according other endpoint's values */
2111
2112                strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2113
2114                msg_tol = msg_link_tolerance(msg);
2115                if (msg_tol > l_ptr->tolerance)
2116                        link_set_supervision_props(l_ptr, msg_tol);
2117
2118                if (msg_linkprio(msg) > l_ptr->priority)
2119                        l_ptr->priority = msg_linkprio(msg);
2120
2121                max_pkt_info = msg_max_pkt(msg);
2122                if (max_pkt_info) {
2123                        if (max_pkt_info < l_ptr->max_pkt_target)
2124                                l_ptr->max_pkt_target = max_pkt_info;
2125                        if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2126                                l_ptr->max_pkt = l_ptr->max_pkt_target;
2127                } else {
2128                        l_ptr->max_pkt = l_ptr->max_pkt_target;
2129                }
2130                l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2131
2132                link_state_event(l_ptr, msg_type(msg));
2133
2134                l_ptr->peer_session = msg_session(msg);
2135                l_ptr->peer_bearer_id = msg_bearer_id(msg);
2136
2137                /* Synchronize broadcast sequence numbers */
2138                if (!tipc_node_redundant_links(l_ptr->owner))
2139                        l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2140                break;
2141        case STATE_MSG:
2142
2143                msg_tol = msg_link_tolerance(msg);
2144                if (msg_tol)
2145                        link_set_supervision_props(l_ptr, msg_tol);
2146
2147                if (msg_linkprio(msg) &&
2148                    (msg_linkprio(msg) != l_ptr->priority)) {
2149                        warn("Resetting link <%s>, priority change %u->%u\n",
2150                             l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2151                        l_ptr->priority = msg_linkprio(msg);
2152                        tipc_link_reset(l_ptr); /* Enforce change to take effect */
2153                        break;
2154                }
2155                link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2156                l_ptr->stats.recv_states++;
2157                if (link_reset_unknown(l_ptr))
2158                        break;
2159
2160                if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2161                        rec_gap = mod(msg_next_sent(msg) -
2162                                      mod(l_ptr->next_in_no));
2163                }
2164
2165                max_pkt_ack = msg_max_pkt(msg);
2166                if (max_pkt_ack > l_ptr->max_pkt) {
2167                        l_ptr->max_pkt = max_pkt_ack;
2168                        l_ptr->max_pkt_probes = 0;
2169                }
2170
2171                max_pkt_ack = 0;
2172                if (msg_probe(msg)) {
2173                        l_ptr->stats.recv_probes++;
2174                        if (msg_size(msg) > sizeof(l_ptr->proto_msg))
2175                                max_pkt_ack = msg_size(msg);
2176                }
2177
2178                /* Protocol message before retransmits, reduce loss risk */
2179
2180                tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2181
2182                if (rec_gap || (msg_probe(msg))) {
2183                        tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2184                                                 0, rec_gap, 0, 0, max_pkt_ack);
2185                }
2186                if (msg_seq_gap(msg)) {
2187                        l_ptr->stats.recv_nacks++;
2188                        tipc_link_retransmit(l_ptr, l_ptr->first_out,
2189                                             msg_seq_gap(msg));
2190                }
2191                break;
2192        }
2193exit:
2194        buf_discard(buf);
2195}
2196
2197
2198/*
2199 * tipc_link_tunnel(): Send one message via a link belonging to
2200 * another bearer. Owner node is locked.
2201 */
2202static void tipc_link_tunnel(struct tipc_link *l_ptr,
2203                             struct tipc_msg *tunnel_hdr,
2204                             struct tipc_msg  *msg,
2205                             u32 selector)
2206{
2207        struct tipc_link *tunnel;
2208        struct sk_buff *buf;
2209        u32 length = msg_size(msg);
2210
2211        tunnel = l_ptr->owner->active_links[selector & 1];
2212        if (!tipc_link_is_up(tunnel)) {
2213                warn("Link changeover error, "
2214                     "tunnel link no longer available\n");
2215                return;
2216        }
2217        msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2218        buf = tipc_buf_acquire(length + INT_H_SIZE);
2219        if (!buf) {
2220                warn("Link changeover error, "
2221                     "unable to send tunnel msg\n");
2222                return;
2223        }
2224        skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2225        skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2226        tipc_link_send_buf(tunnel, buf);
2227}
2228
2229
2230
2231/*
2232 * changeover(): Send whole message queue via the remaining link
2233 *               Owner node is locked.
2234 */
2235
2236void tipc_link_changeover(struct tipc_link *l_ptr)
2237{
2238        u32 msgcount = l_ptr->out_queue_size;
2239        struct sk_buff *crs = l_ptr->first_out;
2240        struct tipc_link *tunnel = l_ptr->owner->active_links[0];
2241        struct tipc_msg tunnel_hdr;
2242        int split_bundles;
2243
2244        if (!tunnel)
2245                return;
2246
2247        if (!l_ptr->owner->permit_changeover) {
2248                warn("Link changeover error, "
2249                     "peer did not permit changeover\n");
2250                return;
2251        }
2252
2253        tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2254                 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2255        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2256        msg_set_msgcnt(&tunnel_hdr, msgcount);
2257
2258        if (!l_ptr->first_out) {
2259                struct sk_buff *buf;
2260
2261                buf = tipc_buf_acquire(INT_H_SIZE);
2262                if (buf) {
2263                        skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2264                        msg_set_size(&tunnel_hdr, INT_H_SIZE);
2265                        tipc_link_send_buf(tunnel, buf);
2266                } else {
2267                        warn("Link changeover error, "
2268                             "unable to send changeover msg\n");
2269                }
2270                return;
2271        }
2272
2273        split_bundles = (l_ptr->owner->active_links[0] !=
2274                         l_ptr->owner->active_links[1]);
2275
2276        while (crs) {
2277                struct tipc_msg *msg = buf_msg(crs);
2278
2279                if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2280                        struct tipc_msg *m = msg_get_wrapped(msg);
2281                        unchar *pos = (unchar *)m;
2282
2283                        msgcount = msg_msgcnt(msg);
2284                        while (msgcount--) {
2285                                msg_set_seqno(m, msg_seqno(msg));
2286                                tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2287                                                 msg_link_selector(m));
2288                                pos += align(msg_size(m));
2289                                m = (struct tipc_msg *)pos;
2290                        }
2291                } else {
2292                        tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2293                                         msg_link_selector(msg));
2294                }
2295                crs = crs->next;
2296        }
2297}
2298
2299void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel)
2300{
2301        struct sk_buff *iter;
2302        struct tipc_msg tunnel_hdr;
2303
2304        tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2305                 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2306        msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2307        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2308        iter = l_ptr->first_out;
2309        while (iter) {
2310                struct sk_buff *outbuf;
2311                struct tipc_msg *msg = buf_msg(iter);
2312                u32 length = msg_size(msg);
2313
2314                if (msg_user(msg) == MSG_BUNDLER)
2315                        msg_set_type(msg, CLOSED_MSG);
2316                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
2317                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2318                msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2319                outbuf = tipc_buf_acquire(length + INT_H_SIZE);
2320                if (outbuf == NULL) {
2321                        warn("Link changeover error, "
2322                             "unable to send duplicate msg\n");
2323                        return;
2324                }
2325                skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2326                skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2327                                               length);
2328                tipc_link_send_buf(tunnel, outbuf);
2329                if (!tipc_link_is_up(l_ptr))
2330                        return;
2331                iter = iter->next;
2332        }
2333}
2334
2335
2336
2337/**
2338 * buf_extract - extracts embedded TIPC message from another message
2339 * @skb: encapsulating message buffer
2340 * @from_pos: offset to extract from
2341 *
2342 * Returns a new message buffer containing an embedded message.  The
2343 * encapsulating message itself is left unchanged.
2344 */
2345
2346static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2347{
2348        struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2349        u32 size = msg_size(msg);
2350        struct sk_buff *eb;
2351
2352        eb = tipc_buf_acquire(size);
2353        if (eb)
2354                skb_copy_to_linear_data(eb, msg, size);
2355        return eb;
2356}
2357
2358/*
2359 *  link_recv_changeover_msg(): Receive tunneled packet sent
2360 *  via other link. Node is locked. Return extracted buffer.
2361 */
2362
2363static int link_recv_changeover_msg(struct tipc_link **l_ptr,
2364                                    struct sk_buff **buf)
2365{
2366        struct sk_buff *tunnel_buf = *buf;
2367        struct tipc_link *dest_link;
2368        struct tipc_msg *msg;
2369        struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2370        u32 msg_typ = msg_type(tunnel_msg);
2371        u32 msg_count = msg_msgcnt(tunnel_msg);
2372
2373        dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2374        if (!dest_link)
2375                goto exit;
2376        if (dest_link == *l_ptr) {
2377                err("Unexpected changeover message on link <%s>\n",
2378                    (*l_ptr)->name);
2379                goto exit;
2380        }
2381        *l_ptr = dest_link;
2382        msg = msg_get_wrapped(tunnel_msg);
2383
2384        if (msg_typ == DUPLICATE_MSG) {
2385                if (less(msg_seqno(msg), mod(dest_link->next_in_no)))
2386                        goto exit;
2387                *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2388                if (*buf == NULL) {
2389                        warn("Link changeover error, duplicate msg dropped\n");
2390                        goto exit;
2391                }
2392                buf_discard(tunnel_buf);
2393                return 1;
2394        }
2395
2396        /* First original message ?: */
2397
2398        if (tipc_link_is_up(dest_link)) {
2399                info("Resetting link <%s>, changeover initiated by peer\n",
2400                     dest_link->name);
2401                tipc_link_reset(dest_link);
2402                dest_link->exp_msg_count = msg_count;
2403                if (!msg_count)
2404                        goto exit;
2405        } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2406                dest_link->exp_msg_count = msg_count;
2407                if (!msg_count)
2408                        goto exit;
2409        }
2410
2411        /* Receive original message */
2412
2413        if (dest_link->exp_msg_count == 0) {
2414                warn("Link switchover error, "
2415                     "got too many tunnelled messages\n");
2416                goto exit;
2417        }
2418        dest_link->exp_msg_count--;
2419        if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2420                goto exit;
2421        } else {
2422                *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2423                if (*buf != NULL) {
2424                        buf_discard(tunnel_buf);
2425                        return 1;
2426                } else {
2427                        warn("Link changeover error, original msg dropped\n");
2428                }
2429        }
2430exit:
2431        *buf = NULL;
2432        buf_discard(tunnel_buf);
2433        return 0;
2434}
2435
2436/*
2437 *  Bundler functionality:
2438 */
2439void tipc_link_recv_bundle(struct sk_buff *buf)
2440{
2441        u32 msgcount = msg_msgcnt(buf_msg(buf));
2442        u32 pos = INT_H_SIZE;
2443        struct sk_buff *obuf;
2444
2445        while (msgcount--) {
2446                obuf = buf_extract(buf, pos);
2447                if (obuf == NULL) {
2448                        warn("Link unable to unbundle message(s)\n");
2449                        break;
2450                }
2451                pos += align(msg_size(buf_msg(obuf)));
2452                tipc_net_route_msg(obuf);
2453        }
2454        buf_discard(buf);
2455}
2456
2457/*
2458 *  Fragmentation/defragmentation:
2459 */
2460
2461
2462/*
2463 * link_send_long_buf: Entry for buffers needing fragmentation.
2464 * The buffer is complete, inclusive total message length.
2465 * Returns user data length.
2466 */
2467static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
2468{
2469        struct sk_buff *buf_chain = NULL;
2470        struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
2471        struct tipc_msg *inmsg = buf_msg(buf);
2472        struct tipc_msg fragm_hdr;
2473        u32 insize = msg_size(inmsg);
2474        u32 dsz = msg_data_sz(inmsg);
2475        unchar *crs = buf->data;
2476        u32 rest = insize;
2477        u32 pack_sz = l_ptr->max_pkt;
2478        u32 fragm_sz = pack_sz - INT_H_SIZE;
2479        u32 fragm_no = 0;
2480        u32 destaddr;
2481
2482        if (msg_short(inmsg))
2483                destaddr = l_ptr->addr;
2484        else
2485                destaddr = msg_destnode(inmsg);
2486
2487        /* Prepare reusable fragment header: */
2488
2489        tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2490                 INT_H_SIZE, destaddr);
2491
2492        /* Chop up message: */
2493
2494        while (rest > 0) {
2495                struct sk_buff *fragm;
2496
2497                if (rest <= fragm_sz) {
2498                        fragm_sz = rest;
2499                        msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2500                }
2501                fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2502                if (fragm == NULL) {
2503                        buf_discard(buf);
2504                        while (buf_chain) {
2505                                buf = buf_chain;
2506                                buf_chain = buf_chain->next;
2507                                buf_discard(buf);
2508                        }
2509                        return -ENOMEM;
2510                }
2511                msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2512                fragm_no++;
2513                msg_set_fragm_no(&fragm_hdr, fragm_no);
2514                skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2515                skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2516                                               fragm_sz);
2517                buf_chain_tail->next = fragm;
2518                buf_chain_tail = fragm;
2519
2520                rest -= fragm_sz;
2521                crs += fragm_sz;
2522                msg_set_type(&fragm_hdr, FRAGMENT);
2523        }
2524        buf_discard(buf);
2525
2526        /* Append chain of fragments to send queue & send them */
2527
2528        l_ptr->long_msg_seq_no++;
2529        link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
2530        l_ptr->stats.sent_fragments += fragm_no;
2531        l_ptr->stats.sent_fragmented++;
2532        tipc_link_push_queue(l_ptr);
2533
2534        return dsz;
2535}
2536
2537/*
2538 * A pending message being re-assembled must store certain values
2539 * to handle subsequent fragments correctly. The following functions
2540 * help storing these values in unused, available fields in the
2541 * pending message. This makes dynamic memory allocation unnecessary.
2542 */
2543
2544static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2545{
2546        msg_set_seqno(buf_msg(buf), seqno);
2547}
2548
2549static u32 get_fragm_size(struct sk_buff *buf)
2550{
2551        return msg_ack(buf_msg(buf));
2552}
2553
2554static void set_fragm_size(struct sk_buff *buf, u32 sz)
2555{
2556        msg_set_ack(buf_msg(buf), sz);
2557}
2558
2559static u32 get_expected_frags(struct sk_buff *buf)
2560{
2561        return msg_bcast_ack(buf_msg(buf));
2562}
2563
2564static void set_expected_frags(struct sk_buff *buf, u32 exp)
2565{
2566        msg_set_bcast_ack(buf_msg(buf), exp);
2567}
2568
2569static u32 get_timer_cnt(struct sk_buff *buf)
2570{
2571        return msg_reroute_cnt(buf_msg(buf));
2572}
2573
2574static void incr_timer_cnt(struct sk_buff *buf)
2575{
2576        msg_incr_reroute_cnt(buf_msg(buf));
2577}
2578
2579/*
2580 * tipc_link_recv_fragment(): Called with node lock on. Returns
2581 * the reassembled buffer if message is complete.
2582 */
2583int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2584                            struct tipc_msg **m)
2585{
2586        struct sk_buff *prev = NULL;
2587        struct sk_buff *fbuf = *fb;
2588        struct tipc_msg *fragm = buf_msg(fbuf);
2589        struct sk_buff *pbuf = *pending;
2590        u32 long_msg_seq_no = msg_long_msgno(fragm);
2591
2592        *fb = NULL;
2593
2594        /* Is there an incomplete message waiting for this fragment? */
2595
2596        while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) ||
2597                        (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2598                prev = pbuf;
2599                pbuf = pbuf->next;
2600        }
2601
2602        if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2603                struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2604                u32 msg_sz = msg_size(imsg);
2605                u32 fragm_sz = msg_data_sz(fragm);
2606                u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2607                u32 max =  TIPC_MAX_USER_MSG_SIZE + NAMED_H_SIZE;
2608                if (msg_type(imsg) == TIPC_MCAST_MSG)
2609                        max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2610                if (msg_size(imsg) > max) {
2611                        buf_discard(fbuf);
2612                        return 0;
2613                }
2614                pbuf = tipc_buf_acquire(msg_size(imsg));
2615                if (pbuf != NULL) {
2616                        pbuf->next = *pending;
2617                        *pending = pbuf;
2618                        skb_copy_to_linear_data(pbuf, imsg,
2619                                                msg_data_sz(fragm));
2620                        /*  Prepare buffer for subsequent fragments. */
2621
2622                        set_long_msg_seqno(pbuf, long_msg_seq_no);
2623                        set_fragm_size(pbuf, fragm_sz);
2624                        set_expected_frags(pbuf, exp_fragm_cnt - 1);
2625                } else {
2626                        warn("Link unable to reassemble fragmented message\n");
2627                }
2628                buf_discard(fbuf);
2629                return 0;
2630        } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2631                u32 dsz = msg_data_sz(fragm);
2632                u32 fsz = get_fragm_size(pbuf);
2633                u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2634                u32 exp_frags = get_expected_frags(pbuf) - 1;
2635                skb_copy_to_linear_data_offset(pbuf, crs,
2636                                               msg_data(fragm), dsz);
2637                buf_discard(fbuf);
2638
2639                /* Is message complete? */
2640
2641                if (exp_frags == 0) {
2642                        if (prev)
2643                                prev->next = pbuf->next;
2644                        else
2645                                *pending = pbuf->next;
2646                        msg_reset_reroute_cnt(buf_msg(pbuf));
2647                        *fb = pbuf;
2648                        *m = buf_msg(pbuf);
2649                        return 1;
2650                }
2651                set_expected_frags(pbuf, exp_frags);
2652                return 0;
2653        }
2654        buf_discard(fbuf);
2655        return 0;
2656}
2657
2658/**
2659 * link_check_defragm_bufs - flush stale incoming message fragments
2660 * @l_ptr: pointer to link
2661 */
2662
2663static void link_check_defragm_bufs(struct tipc_link *l_ptr)
2664{
2665        struct sk_buff *prev = NULL;
2666        struct sk_buff *next = NULL;
2667        struct sk_buff *buf = l_ptr->defragm_buf;
2668
2669        if (!buf)
2670                return;
2671        if (!link_working_working(l_ptr))
2672                return;
2673        while (buf) {
2674                u32 cnt = get_timer_cnt(buf);
2675
2676                next = buf->next;
2677                if (cnt < 4) {
2678                        incr_timer_cnt(buf);
2679                        prev = buf;
2680                } else {
2681                        if (prev)
2682                                prev->next = buf->next;
2683                        else
2684                                l_ptr->defragm_buf = buf->next;
2685                        buf_discard(buf);
2686                }
2687                buf = next;
2688        }
2689}
2690
2691
2692
2693static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
2694{
2695        if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
2696                return;
2697
2698        l_ptr->tolerance = tolerance;
2699        l_ptr->continuity_interval =
2700                ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2701        l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2702}
2703
2704
2705void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
2706{
2707        /* Data messages from this node, inclusive FIRST_FRAGM */
2708        l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2709        l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2710        l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2711        l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2712        /* Transiting data messages,inclusive FIRST_FRAGM */
2713        l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2714        l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2715        l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2716        l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2717        l_ptr->queue_limit[CONN_MANAGER] = 1200;
2718        l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2719        l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2720        /* FRAGMENT and LAST_FRAGMENT packets */
2721        l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2722}
2723
2724/**
2725 * link_find_link - locate link by name
2726 * @name - ptr to link name string
2727 * @node - ptr to area to be filled with ptr to associated node
2728 *
2729 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2730 * this also prevents link deletion.
2731 *
2732 * Returns pointer to link (or 0 if invalid link name).
2733 */
2734
2735static struct tipc_link *link_find_link(const char *name,
2736                                        struct tipc_node **node)
2737{
2738        struct tipc_link_name link_name_parts;
2739        struct tipc_bearer *b_ptr;
2740        struct tipc_link *l_ptr;
2741
2742        if (!link_name_validate(name, &link_name_parts))
2743                return NULL;
2744
2745        b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2746        if (!b_ptr)
2747                return NULL;
2748
2749        *node = tipc_node_find(link_name_parts.addr_peer);
2750        if (!*node)
2751                return NULL;
2752
2753        l_ptr = (*node)->links[b_ptr->identity];
2754        if (!l_ptr || strcmp(l_ptr->name, name))
2755                return NULL;
2756
2757        return l_ptr;
2758}
2759
2760/**
2761 * link_value_is_valid -- validate proposed link tolerance/priority/window
2762 *
2763 * @cmd - value type (TIPC_CMD_SET_LINK_*)
2764 * @new_value - the new value
2765 *
2766 * Returns 1 if value is within range, 0 if not.
2767 */
2768
2769static int link_value_is_valid(u16 cmd, u32 new_value)
2770{
2771        switch (cmd) {
2772        case TIPC_CMD_SET_LINK_TOL:
2773                return (new_value >= TIPC_MIN_LINK_TOL) &&
2774                        (new_value <= TIPC_MAX_LINK_TOL);
2775        case TIPC_CMD_SET_LINK_PRI:
2776                return (new_value <= TIPC_MAX_LINK_PRI);
2777        case TIPC_CMD_SET_LINK_WINDOW:
2778                return (new_value >= TIPC_MIN_LINK_WIN) &&
2779                        (new_value <= TIPC_MAX_LINK_WIN);
2780        }
2781        return 0;
2782}
2783
2784
2785/**
2786 * link_cmd_set_value - change priority/tolerance/window for link/bearer/media
2787 * @name - ptr to link, bearer, or media name
2788 * @new_value - new value of link, bearer, or media setting
2789 * @cmd - which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*)
2790 *
2791 * Caller must hold 'tipc_net_lock' to ensure link/bearer/media is not deleted.
2792 *
2793 * Returns 0 if value updated and negative value on error.
2794 */
2795
2796static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
2797{
2798        struct tipc_node *node;
2799        struct tipc_link *l_ptr;
2800        struct tipc_bearer *b_ptr;
2801        struct tipc_media *m_ptr;
2802
2803        l_ptr = link_find_link(name, &node);
2804        if (l_ptr) {
2805                /*
2806                 * acquire node lock for tipc_link_send_proto_msg().
2807                 * see "TIPC locking policy" in net.c.
2808                 */
2809                tipc_node_lock(node);
2810                switch (cmd) {
2811                case TIPC_CMD_SET_LINK_TOL:
2812                        link_set_supervision_props(l_ptr, new_value);
2813                        tipc_link_send_proto_msg(l_ptr,
2814                                STATE_MSG, 0, 0, new_value, 0, 0);
2815                        break;
2816                case TIPC_CMD_SET_LINK_PRI:
2817                        l_ptr->priority = new_value;
2818                        tipc_link_send_proto_msg(l_ptr,
2819                                STATE_MSG, 0, 0, 0, new_value, 0);
2820                        break;
2821                case TIPC_CMD_SET_LINK_WINDOW:
2822                        tipc_link_set_queue_limits(l_ptr, new_value);
2823                        break;
2824                }
2825                tipc_node_unlock(node);
2826                return 0;
2827        }
2828
2829        b_ptr = tipc_bearer_find(name);
2830        if (b_ptr) {
2831                switch (cmd) {
2832                case TIPC_CMD_SET_LINK_TOL:
2833                        b_ptr->tolerance = new_value;
2834                        return 0;
2835                case TIPC_CMD_SET_LINK_PRI:
2836                        b_ptr->priority = new_value;
2837                        return 0;
2838                case TIPC_CMD_SET_LINK_WINDOW:
2839                        b_ptr->window = new_value;
2840                        return 0;
2841                }
2842                return -EINVAL;
2843        }
2844
2845        m_ptr = tipc_media_find(name);
2846        if (!m_ptr)
2847                return -ENODEV;
2848        switch (cmd) {
2849        case TIPC_CMD_SET_LINK_TOL:
2850                m_ptr->tolerance = new_value;
2851                return 0;
2852        case TIPC_CMD_SET_LINK_PRI:
2853                m_ptr->priority = new_value;
2854                return 0;
2855        case TIPC_CMD_SET_LINK_WINDOW:
2856                m_ptr->window = new_value;
2857                return 0;
2858        }
2859        return -EINVAL;
2860}
2861
2862struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2863                                     u16 cmd)
2864{
2865        struct tipc_link_config *args;
2866        u32 new_value;
2867        int res;
2868
2869        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2870                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2871
2872        args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2873        new_value = ntohl(args->value);
2874
2875        if (!link_value_is_valid(cmd, new_value))
2876                return tipc_cfg_reply_error_string(
2877                        "cannot change, value invalid");
2878
2879        if (!strcmp(args->name, tipc_bclink_name)) {
2880                if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2881                    (tipc_bclink_set_queue_limits(new_value) == 0))
2882                        return tipc_cfg_reply_none();
2883                return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2884                                                   " (cannot change setting on broadcast link)");
2885        }
2886
2887        read_lock_bh(&tipc_net_lock);
2888        res = link_cmd_set_value(args->name, new_value, cmd);
2889        read_unlock_bh(&tipc_net_lock);
2890        if (res)
2891                return tipc_cfg_reply_error_string("cannot change link setting");
2892
2893        return tipc_cfg_reply_none();
2894}
2895
2896/**
2897 * link_reset_statistics - reset link statistics
2898 * @l_ptr: pointer to link
2899 */
2900
2901static void link_reset_statistics(struct tipc_link *l_ptr)
2902{
2903        memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2904        l_ptr->stats.sent_info = l_ptr->next_out_no;
2905        l_ptr->stats.recv_info = l_ptr->next_in_no;
2906}
2907
2908struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2909{
2910        char *link_name;
2911        struct tipc_link *l_ptr;
2912        struct tipc_node *node;
2913
2914        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2915                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2916
2917        link_name = (char *)TLV_DATA(req_tlv_area);
2918        if (!strcmp(link_name, tipc_bclink_name)) {
2919                if (tipc_bclink_reset_stats())
2920                        return tipc_cfg_reply_error_string("link not found");
2921                return tipc_cfg_reply_none();
2922        }
2923
2924        read_lock_bh(&tipc_net_lock);
2925        l_ptr = link_find_link(link_name, &node);
2926        if (!l_ptr) {
2927                read_unlock_bh(&tipc_net_lock);
2928                return tipc_cfg_reply_error_string("link not found");
2929        }
2930
2931        tipc_node_lock(node);
2932        link_reset_statistics(l_ptr);
2933        tipc_node_unlock(node);
2934        read_unlock_bh(&tipc_net_lock);
2935        return tipc_cfg_reply_none();
2936}
2937
2938/**
2939 * percent - convert count to a percentage of total (rounding up or down)
2940 */
2941
2942static u32 percent(u32 count, u32 total)
2943{
2944        return (count * 100 + (total / 2)) / total;
2945}
2946
2947/**
2948 * tipc_link_stats - print link statistics
2949 * @name: link name
2950 * @buf: print buffer area
2951 * @buf_size: size of print buffer area
2952 *
2953 * Returns length of print buffer data string (or 0 if error)
2954 */
2955
2956static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2957{
2958        struct print_buf pb;
2959        struct tipc_link *l_ptr;
2960        struct tipc_node *node;
2961        char *status;
2962        u32 profile_total = 0;
2963
2964        if (!strcmp(name, tipc_bclink_name))
2965                return tipc_bclink_stats(buf, buf_size);
2966
2967        tipc_printbuf_init(&pb, buf, buf_size);
2968
2969        read_lock_bh(&tipc_net_lock);
2970        l_ptr = link_find_link(name, &node);
2971        if (!l_ptr) {
2972                read_unlock_bh(&tipc_net_lock);
2973                return 0;
2974        }
2975        tipc_node_lock(node);
2976
2977        if (tipc_link_is_active(l_ptr))
2978                status = "ACTIVE";
2979        else if (tipc_link_is_up(l_ptr))
2980                status = "STANDBY";
2981        else
2982                status = "DEFUNCT";
2983        tipc_printf(&pb, "Link <%s>\n"
2984                         "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2985                         "  Window:%u packets\n",
2986                    l_ptr->name, status, l_ptr->max_pkt,
2987                    l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
2988        tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2989                    l_ptr->next_in_no - l_ptr->stats.recv_info,
2990                    l_ptr->stats.recv_fragments,
2991                    l_ptr->stats.recv_fragmented,
2992                    l_ptr->stats.recv_bundles,
2993                    l_ptr->stats.recv_bundled);
2994        tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
2995                    l_ptr->next_out_no - l_ptr->stats.sent_info,
2996                    l_ptr->stats.sent_fragments,
2997                    l_ptr->stats.sent_fragmented,
2998                    l_ptr->stats.sent_bundles,
2999                    l_ptr->stats.sent_bundled);
3000        profile_total = l_ptr->stats.msg_length_counts;
3001        if (!profile_total)
3002                profile_total = 1;
3003        tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
3004                         "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
3005                         "-16384:%u%% -32768:%u%% -66000:%u%%\n",
3006                    l_ptr->stats.msg_length_counts,
3007                    l_ptr->stats.msg_lengths_total / profile_total,
3008                    percent(l_ptr->stats.msg_length_profile[0], profile_total),
3009                    percent(l_ptr->stats.msg_length_profile[1], profile_total),
3010                    percent(l_ptr->stats.msg_length_profile[2], profile_total),
3011                    percent(l_ptr->stats.msg_length_profile[3], profile_total),
3012                    percent(l_ptr->stats.msg_length_profile[4], profile_total),
3013                    percent(l_ptr->stats.msg_length_profile[5], profile_total),
3014                    percent(l_ptr->stats.msg_length_profile[6], profile_total));
3015        tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
3016                    l_ptr->stats.recv_states,
3017                    l_ptr->stats.recv_probes,
3018                    l_ptr->stats.recv_nacks,
3019                    l_ptr->stats.deferred_recv,
3020                    l_ptr->stats.duplicates);
3021        tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
3022                    l_ptr->stats.sent_states,
3023                    l_ptr->stats.sent_probes,
3024                    l_ptr->stats.sent_nacks,
3025                    l_ptr->stats.sent_acks,
3026                    l_ptr->stats.retransmitted);
3027        tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
3028                    l_ptr->stats.bearer_congs,
3029                    l_ptr->stats.link_congs,
3030                    l_ptr->stats.max_queue_sz,
3031                    l_ptr->stats.queue_sz_counts
3032                    ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
3033                    : 0);
3034
3035        tipc_node_unlock(node);
3036        read_unlock_bh(&tipc_net_lock);
3037        return tipc_printbuf_validate(&pb);
3038}
3039
3040#define MAX_LINK_STATS_INFO 2000
3041
3042struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
3043{
3044        struct sk_buff *buf;
3045        struct tlv_desc *rep_tlv;
3046        int str_len;
3047
3048        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3049                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3050
3051        buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
3052        if (!buf)
3053                return NULL;
3054
3055        rep_tlv = (struct tlv_desc *)buf->data;
3056
3057        str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
3058                                  (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
3059        if (!str_len) {
3060                buf_discard(buf);
3061                return tipc_cfg_reply_error_string("link not found");
3062        }
3063
3064        skb_put(buf, TLV_SPACE(str_len));
3065        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3066
3067        return buf;
3068}
3069
3070/**
3071 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
3072 * @dest: network address of destination node
3073 * @selector: used to select from set of active links
3074 *
3075 * If no active link can be found, uses default maximum packet size.
3076 */
3077
3078u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
3079{
3080        struct tipc_node *n_ptr;
3081        struct tipc_link *l_ptr;
3082        u32 res = MAX_PKT_DEFAULT;
3083
3084        if (dest == tipc_own_addr)
3085                return MAX_MSG_SIZE;
3086
3087        read_lock_bh(&tipc_net_lock);
3088        n_ptr = tipc_node_find(dest);
3089        if (n_ptr) {
3090                tipc_node_lock(n_ptr);
3091                l_ptr = n_ptr->active_links[selector & 1];
3092                if (l_ptr)
3093                        res = l_ptr->max_pkt;
3094                tipc_node_unlock(n_ptr);
3095        }
3096        read_unlock_bh(&tipc_net_lock);
3097        return res;
3098}
3099
3100static void link_print(struct tipc_link *l_ptr, const char *str)
3101{
3102        char print_area[256];
3103        struct print_buf pb;
3104        struct print_buf *buf = &pb;
3105
3106        tipc_printbuf_init(buf, print_area, sizeof(print_area));
3107
3108        tipc_printf(buf, str);
3109        tipc_printf(buf, "Link %x<%s>:",
3110                    l_ptr->addr, l_ptr->b_ptr->name);
3111
3112#ifdef CONFIG_TIPC_DEBUG
3113        if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3114                goto print_state;
3115
3116        tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3117        tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3118        tipc_printf(buf, "SQUE");
3119        if (l_ptr->first_out) {
3120                tipc_printf(buf, "[%u..", buf_seqno(l_ptr->first_out));
3121                if (l_ptr->next_out)
3122                        tipc_printf(buf, "%u..", buf_seqno(l_ptr->next_out));
3123                tipc_printf(buf, "%u]", buf_seqno(l_ptr->last_out));
3124                if ((mod(buf_seqno(l_ptr->last_out) -
3125                         buf_seqno(l_ptr->first_out))
3126                     != (l_ptr->out_queue_size - 1)) ||
3127                    (l_ptr->last_out->next != NULL)) {
3128                        tipc_printf(buf, "\nSend queue inconsistency\n");
3129                        tipc_printf(buf, "first_out= %p ", l_ptr->first_out);
3130                        tipc_printf(buf, "next_out= %p ", l_ptr->next_out);
3131                        tipc_printf(buf, "last_out= %p ", l_ptr->last_out);
3132                }
3133        } else
3134                tipc_printf(buf, "[]");
3135        tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3136        if (l_ptr->oldest_deferred_in) {
3137                u32 o = buf_seqno(l_ptr->oldest_deferred_in);
3138                u32 n = buf_seqno(l_ptr->newest_deferred_in);
3139                tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3140                if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3141                        tipc_printf(buf, ":RQSIZ(%u)",
3142                                    l_ptr->deferred_inqueue_sz);
3143                }
3144        }
3145print_state:
3146#endif
3147
3148        if (link_working_unknown(l_ptr))
3149                tipc_printf(buf, ":WU");
3150        else if (link_reset_reset(l_ptr))
3151                tipc_printf(buf, ":RR");
3152        else if (link_reset_unknown(l_ptr))
3153                tipc_printf(buf, ":RU");
3154        else if (link_working_working(l_ptr))
3155                tipc_printf(buf, ":WW");
3156        tipc_printf(buf, "\n");
3157
3158        tipc_printbuf_validate(buf);
3159        info("%s", print_area);
3160}
3161
3162