linux/net/tipc/port.c
<<
>>
Prefs
   1/*
   2 * net/tipc/port.c: TIPC port code
   3 *
   4 * Copyright (c) 1992-2007, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "config.h"
  39#include "port.h"
  40#include "name_table.h"
  41
  42/* Connection management: */
  43#define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
  44#define CONFIRMED 0
  45#define PROBING 1
  46
  47#define MAX_REJECT_SIZE 1024
  48
  49static struct sk_buff *msg_queue_head;
  50static struct sk_buff *msg_queue_tail;
  51
  52DEFINE_SPINLOCK(tipc_port_list_lock);
  53static DEFINE_SPINLOCK(queue_lock);
  54
  55static LIST_HEAD(ports);
  56static void port_handle_node_down(unsigned long ref);
  57static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
  58static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
  59static void port_timeout(unsigned long ref);
  60
  61
  62static u32 port_peernode(struct tipc_port *p_ptr)
  63{
  64        return msg_destnode(&p_ptr->phdr);
  65}
  66
  67static u32 port_peerport(struct tipc_port *p_ptr)
  68{
  69        return msg_destport(&p_ptr->phdr);
  70}
  71
  72/**
  73 * tipc_multicast - send a multicast message to local and remote destinations
  74 */
  75
  76int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
  77                   u32 num_sect, struct iovec const *msg_sect,
  78                   unsigned int total_len)
  79{
  80        struct tipc_msg *hdr;
  81        struct sk_buff *buf;
  82        struct sk_buff *ibuf = NULL;
  83        struct tipc_port_list dports = {0, NULL, };
  84        struct tipc_port *oport = tipc_port_deref(ref);
  85        int ext_targets;
  86        int res;
  87
  88        if (unlikely(!oport))
  89                return -EINVAL;
  90
  91        /* Create multicast message */
  92
  93        hdr = &oport->phdr;
  94        msg_set_type(hdr, TIPC_MCAST_MSG);
  95        msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
  96        msg_set_destport(hdr, 0);
  97        msg_set_destnode(hdr, 0);
  98        msg_set_nametype(hdr, seq->type);
  99        msg_set_namelower(hdr, seq->lower);
 100        msg_set_nameupper(hdr, seq->upper);
 101        msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 102        res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
 103                        !oport->user_port, &buf);
 104        if (unlikely(!buf))
 105                return res;
 106
 107        /* Figure out where to send multicast message */
 108
 109        ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
 110                                                TIPC_NODE_SCOPE, &dports);
 111
 112        /* Send message to destinations (duplicate it only if necessary) */
 113
 114        if (ext_targets) {
 115                if (dports.count != 0) {
 116                        ibuf = skb_copy(buf, GFP_ATOMIC);
 117                        if (ibuf == NULL) {
 118                                tipc_port_list_free(&dports);
 119                                buf_discard(buf);
 120                                return -ENOMEM;
 121                        }
 122                }
 123                res = tipc_bclink_send_msg(buf);
 124                if ((res < 0) && (dports.count != 0))
 125                        buf_discard(ibuf);
 126        } else {
 127                ibuf = buf;
 128        }
 129
 130        if (res >= 0) {
 131                if (ibuf)
 132                        tipc_port_recv_mcast(ibuf, &dports);
 133        } else {
 134                tipc_port_list_free(&dports);
 135        }
 136        return res;
 137}
 138
 139/**
 140 * tipc_port_recv_mcast - deliver multicast message to all destination ports
 141 *
 142 * If there is no port list, perform a lookup to create one
 143 */
 144
 145void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
 146{
 147        struct tipc_msg *msg;
 148        struct tipc_port_list dports = {0, NULL, };
 149        struct tipc_port_list *item = dp;
 150        int cnt = 0;
 151
 152        msg = buf_msg(buf);
 153
 154        /* Create destination port list, if one wasn't supplied */
 155
 156        if (dp == NULL) {
 157                tipc_nametbl_mc_translate(msg_nametype(msg),
 158                                     msg_namelower(msg),
 159                                     msg_nameupper(msg),
 160                                     TIPC_CLUSTER_SCOPE,
 161                                     &dports);
 162                item = dp = &dports;
 163        }
 164
 165        /* Deliver a copy of message to each destination port */
 166
 167        if (dp->count != 0) {
 168                msg_set_destnode(msg, tipc_own_addr);
 169                if (dp->count == 1) {
 170                        msg_set_destport(msg, dp->ports[0]);
 171                        tipc_port_recv_msg(buf);
 172                        tipc_port_list_free(dp);
 173                        return;
 174                }
 175                for (; cnt < dp->count; cnt++) {
 176                        int index = cnt % PLSIZE;
 177                        struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
 178
 179                        if (b == NULL) {
 180                                warn("Unable to deliver multicast message(s)\n");
 181                                goto exit;
 182                        }
 183                        if ((index == 0) && (cnt != 0))
 184                                item = item->next;
 185                        msg_set_destport(buf_msg(b), item->ports[index]);
 186                        tipc_port_recv_msg(b);
 187                }
 188        }
 189exit:
 190        buf_discard(buf);
 191        tipc_port_list_free(dp);
 192}
 193
 194/**
 195 * tipc_createport_raw - create a generic TIPC port
 196 *
 197 * Returns pointer to (locked) TIPC port, or NULL if unable to create it
 198 */
 199
 200struct tipc_port *tipc_createport_raw(void *usr_handle,
 201                        u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
 202                        void (*wakeup)(struct tipc_port *),
 203                        const u32 importance)
 204{
 205        struct tipc_port *p_ptr;
 206        struct tipc_msg *msg;
 207        u32 ref;
 208
 209        p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
 210        if (!p_ptr) {
 211                warn("Port creation failed, no memory\n");
 212                return NULL;
 213        }
 214        ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
 215        if (!ref) {
 216                warn("Port creation failed, reference table exhausted\n");
 217                kfree(p_ptr);
 218                return NULL;
 219        }
 220
 221        p_ptr->usr_handle = usr_handle;
 222        p_ptr->max_pkt = MAX_PKT_DEFAULT;
 223        p_ptr->ref = ref;
 224        msg = &p_ptr->phdr;
 225        tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
 226        msg_set_origport(msg, ref);
 227        INIT_LIST_HEAD(&p_ptr->wait_list);
 228        INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
 229        p_ptr->dispatcher = dispatcher;
 230        p_ptr->wakeup = wakeup;
 231        p_ptr->user_port = NULL;
 232        k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
 233        spin_lock_bh(&tipc_port_list_lock);
 234        INIT_LIST_HEAD(&p_ptr->publications);
 235        INIT_LIST_HEAD(&p_ptr->port_list);
 236        list_add_tail(&p_ptr->port_list, &ports);
 237        spin_unlock_bh(&tipc_port_list_lock);
 238        return p_ptr;
 239}
 240
 241int tipc_deleteport(u32 ref)
 242{
 243        struct tipc_port *p_ptr;
 244        struct sk_buff *buf = NULL;
 245
 246        tipc_withdraw(ref, 0, NULL);
 247        p_ptr = tipc_port_lock(ref);
 248        if (!p_ptr)
 249                return -EINVAL;
 250
 251        tipc_ref_discard(ref);
 252        tipc_port_unlock(p_ptr);
 253
 254        k_cancel_timer(&p_ptr->timer);
 255        if (p_ptr->connected) {
 256                buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 257                tipc_nodesub_unsubscribe(&p_ptr->subscription);
 258        }
 259        kfree(p_ptr->user_port);
 260
 261        spin_lock_bh(&tipc_port_list_lock);
 262        list_del(&p_ptr->port_list);
 263        list_del(&p_ptr->wait_list);
 264        spin_unlock_bh(&tipc_port_list_lock);
 265        k_term_timer(&p_ptr->timer);
 266        kfree(p_ptr);
 267        tipc_net_route_msg(buf);
 268        return 0;
 269}
 270
 271static int port_unreliable(struct tipc_port *p_ptr)
 272{
 273        return msg_src_droppable(&p_ptr->phdr);
 274}
 275
 276int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
 277{
 278        struct tipc_port *p_ptr;
 279
 280        p_ptr = tipc_port_lock(ref);
 281        if (!p_ptr)
 282                return -EINVAL;
 283        *isunreliable = port_unreliable(p_ptr);
 284        tipc_port_unlock(p_ptr);
 285        return 0;
 286}
 287
 288int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
 289{
 290        struct tipc_port *p_ptr;
 291
 292        p_ptr = tipc_port_lock(ref);
 293        if (!p_ptr)
 294                return -EINVAL;
 295        msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
 296        tipc_port_unlock(p_ptr);
 297        return 0;
 298}
 299
 300static int port_unreturnable(struct tipc_port *p_ptr)
 301{
 302        return msg_dest_droppable(&p_ptr->phdr);
 303}
 304
 305int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
 306{
 307        struct tipc_port *p_ptr;
 308
 309        p_ptr = tipc_port_lock(ref);
 310        if (!p_ptr)
 311                return -EINVAL;
 312        *isunrejectable = port_unreturnable(p_ptr);
 313        tipc_port_unlock(p_ptr);
 314        return 0;
 315}
 316
 317int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
 318{
 319        struct tipc_port *p_ptr;
 320
 321        p_ptr = tipc_port_lock(ref);
 322        if (!p_ptr)
 323                return -EINVAL;
 324        msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
 325        tipc_port_unlock(p_ptr);
 326        return 0;
 327}
 328
 329/*
 330 * port_build_proto_msg(): create connection protocol message for port
 331 *
 332 * On entry the port must be locked and connected.
 333 */
 334static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
 335                                            u32 type, u32 ack)
 336{
 337        struct sk_buff *buf;
 338        struct tipc_msg *msg;
 339
 340        buf = tipc_buf_acquire(INT_H_SIZE);
 341        if (buf) {
 342                msg = buf_msg(buf);
 343                tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE,
 344                              port_peernode(p_ptr));
 345                msg_set_destport(msg, port_peerport(p_ptr));
 346                msg_set_origport(msg, p_ptr->ref);
 347                msg_set_msgcnt(msg, ack);
 348        }
 349        return buf;
 350}
 351
 352int tipc_reject_msg(struct sk_buff *buf, u32 err)
 353{
 354        struct tipc_msg *msg = buf_msg(buf);
 355        struct sk_buff *rbuf;
 356        struct tipc_msg *rmsg;
 357        int hdr_sz;
 358        u32 imp;
 359        u32 data_sz = msg_data_sz(msg);
 360        u32 src_node;
 361        u32 rmsg_sz;
 362
 363        /* discard rejected message if it shouldn't be returned to sender */
 364
 365        if (WARN(!msg_isdata(msg),
 366                 "attempt to reject message with user=%u", msg_user(msg))) {
 367                dump_stack();
 368                goto exit;
 369        }
 370        if (msg_errcode(msg) || msg_dest_droppable(msg))
 371                goto exit;
 372
 373        /*
 374         * construct returned message by copying rejected message header and
 375         * data (or subset), then updating header fields that need adjusting
 376         */
 377
 378        hdr_sz = msg_hdr_sz(msg);
 379        rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
 380
 381        rbuf = tipc_buf_acquire(rmsg_sz);
 382        if (rbuf == NULL)
 383                goto exit;
 384
 385        rmsg = buf_msg(rbuf);
 386        skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
 387
 388        if (msg_connected(rmsg)) {
 389                imp = msg_importance(rmsg);
 390                if (imp < TIPC_CRITICAL_IMPORTANCE)
 391                        msg_set_importance(rmsg, ++imp);
 392        }
 393        msg_set_non_seq(rmsg, 0);
 394        msg_set_size(rmsg, rmsg_sz);
 395        msg_set_errcode(rmsg, err);
 396        msg_set_prevnode(rmsg, tipc_own_addr);
 397        msg_swap_words(rmsg, 4, 5);
 398        if (!msg_short(rmsg))
 399                msg_swap_words(rmsg, 6, 7);
 400
 401        /* send self-abort message when rejecting on a connected port */
 402        if (msg_connected(msg)) {
 403                struct sk_buff *abuf = NULL;
 404                struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
 405
 406                if (p_ptr) {
 407                        if (p_ptr->connected)
 408                                abuf = port_build_self_abort_msg(p_ptr, err);
 409                        tipc_port_unlock(p_ptr);
 410                }
 411                tipc_net_route_msg(abuf);
 412        }
 413
 414        /* send returned message & dispose of rejected message */
 415
 416        src_node = msg_prevnode(msg);
 417        if (src_node == tipc_own_addr)
 418                tipc_port_recv_msg(rbuf);
 419        else
 420                tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
 421exit:
 422        buf_discard(buf);
 423        return data_sz;
 424}
 425
 426int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
 427                              struct iovec const *msg_sect, u32 num_sect,
 428                              unsigned int total_len, int err)
 429{
 430        struct sk_buff *buf;
 431        int res;
 432
 433        res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
 434                        !p_ptr->user_port, &buf);
 435        if (!buf)
 436                return res;
 437
 438        return tipc_reject_msg(buf, err);
 439}
 440
 441static void port_timeout(unsigned long ref)
 442{
 443        struct tipc_port *p_ptr = tipc_port_lock(ref);
 444        struct sk_buff *buf = NULL;
 445
 446        if (!p_ptr)
 447                return;
 448
 449        if (!p_ptr->connected) {
 450                tipc_port_unlock(p_ptr);
 451                return;
 452        }
 453
 454        /* Last probe answered ? */
 455        if (p_ptr->probing_state == PROBING) {
 456                buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 457        } else {
 458                buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
 459                p_ptr->probing_state = PROBING;
 460                k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
 461        }
 462        tipc_port_unlock(p_ptr);
 463        tipc_net_route_msg(buf);
 464}
 465
 466
 467static void port_handle_node_down(unsigned long ref)
 468{
 469        struct tipc_port *p_ptr = tipc_port_lock(ref);
 470        struct sk_buff *buf = NULL;
 471
 472        if (!p_ptr)
 473                return;
 474        buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
 475        tipc_port_unlock(p_ptr);
 476        tipc_net_route_msg(buf);
 477}
 478
 479
 480static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
 481{
 482        struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
 483
 484        if (buf) {
 485                struct tipc_msg *msg = buf_msg(buf);
 486                msg_swap_words(msg, 4, 5);
 487                msg_swap_words(msg, 6, 7);
 488        }
 489        return buf;
 490}
 491
 492
 493static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
 494{
 495        struct sk_buff *buf;
 496        struct tipc_msg *msg;
 497        u32 imp;
 498
 499        if (!p_ptr->connected)
 500                return NULL;
 501
 502        buf = tipc_buf_acquire(BASIC_H_SIZE);
 503        if (buf) {
 504                msg = buf_msg(buf);
 505                memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
 506                msg_set_hdr_sz(msg, BASIC_H_SIZE);
 507                msg_set_size(msg, BASIC_H_SIZE);
 508                imp = msg_importance(msg);
 509                if (imp < TIPC_CRITICAL_IMPORTANCE)
 510                        msg_set_importance(msg, ++imp);
 511                msg_set_errcode(msg, err);
 512        }
 513        return buf;
 514}
 515
 516void tipc_port_recv_proto_msg(struct sk_buff *buf)
 517{
 518        struct tipc_msg *msg = buf_msg(buf);
 519        struct tipc_port *p_ptr;
 520        struct sk_buff *r_buf = NULL;
 521        u32 orignode = msg_orignode(msg);
 522        u32 origport = msg_origport(msg);
 523        u32 destport = msg_destport(msg);
 524        int wakeable;
 525
 526        /* Validate connection */
 527
 528        p_ptr = tipc_port_lock(destport);
 529        if (!p_ptr || !p_ptr->connected ||
 530            (port_peernode(p_ptr) != orignode) ||
 531            (port_peerport(p_ptr) != origport)) {
 532                r_buf = tipc_buf_acquire(BASIC_H_SIZE);
 533                if (r_buf) {
 534                        msg = buf_msg(r_buf);
 535                        tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
 536                                      BASIC_H_SIZE, orignode);
 537                        msg_set_errcode(msg, TIPC_ERR_NO_PORT);
 538                        msg_set_origport(msg, destport);
 539                        msg_set_destport(msg, origport);
 540                }
 541                if (p_ptr)
 542                        tipc_port_unlock(p_ptr);
 543                goto exit;
 544        }
 545
 546        /* Process protocol message sent by peer */
 547
 548        switch (msg_type(msg)) {
 549        case CONN_ACK:
 550                wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
 551                        p_ptr->wakeup;
 552                p_ptr->acked += msg_msgcnt(msg);
 553                if (!tipc_port_congested(p_ptr)) {
 554                        p_ptr->congested = 0;
 555                        if (wakeable)
 556                                p_ptr->wakeup(p_ptr);
 557                }
 558                break;
 559        case CONN_PROBE:
 560                r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
 561                break;
 562        default:
 563                /* CONN_PROBE_REPLY or unrecognized - no action required */
 564                break;
 565        }
 566        p_ptr->probing_state = CONFIRMED;
 567        tipc_port_unlock(p_ptr);
 568exit:
 569        tipc_net_route_msg(r_buf);
 570        buf_discard(buf);
 571}
 572
 573static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id)
 574{
 575        struct publication *publ;
 576
 577        if (full_id)
 578                tipc_printf(buf, "<%u.%u.%u:%u>:",
 579                            tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
 580                            tipc_node(tipc_own_addr), p_ptr->ref);
 581        else
 582                tipc_printf(buf, "%-10u:", p_ptr->ref);
 583
 584        if (p_ptr->connected) {
 585                u32 dport = port_peerport(p_ptr);
 586                u32 destnode = port_peernode(p_ptr);
 587
 588                tipc_printf(buf, " connected to <%u.%u.%u:%u>",
 589                            tipc_zone(destnode), tipc_cluster(destnode),
 590                            tipc_node(destnode), dport);
 591                if (p_ptr->conn_type != 0)
 592                        tipc_printf(buf, " via {%u,%u}",
 593                                    p_ptr->conn_type,
 594                                    p_ptr->conn_instance);
 595        } else if (p_ptr->published) {
 596                tipc_printf(buf, " bound to");
 597                list_for_each_entry(publ, &p_ptr->publications, pport_list) {
 598                        if (publ->lower == publ->upper)
 599                                tipc_printf(buf, " {%u,%u}", publ->type,
 600                                            publ->lower);
 601                        else
 602                                tipc_printf(buf, " {%u,%u,%u}", publ->type,
 603                                            publ->lower, publ->upper);
 604                }
 605        }
 606        tipc_printf(buf, "\n");
 607}
 608
 609#define MAX_PORT_QUERY 32768
 610
 611struct sk_buff *tipc_port_get_ports(void)
 612{
 613        struct sk_buff *buf;
 614        struct tlv_desc *rep_tlv;
 615        struct print_buf pb;
 616        struct tipc_port *p_ptr;
 617        int str_len;
 618
 619        buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
 620        if (!buf)
 621                return NULL;
 622        rep_tlv = (struct tlv_desc *)buf->data;
 623
 624        tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
 625        spin_lock_bh(&tipc_port_list_lock);
 626        list_for_each_entry(p_ptr, &ports, port_list) {
 627                spin_lock_bh(p_ptr->lock);
 628                port_print(p_ptr, &pb, 0);
 629                spin_unlock_bh(p_ptr->lock);
 630        }
 631        spin_unlock_bh(&tipc_port_list_lock);
 632        str_len = tipc_printbuf_validate(&pb);
 633
 634        skb_put(buf, TLV_SPACE(str_len));
 635        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
 636
 637        return buf;
 638}
 639
 640void tipc_port_reinit(void)
 641{
 642        struct tipc_port *p_ptr;
 643        struct tipc_msg *msg;
 644
 645        spin_lock_bh(&tipc_port_list_lock);
 646        list_for_each_entry(p_ptr, &ports, port_list) {
 647                msg = &p_ptr->phdr;
 648                if (msg_orignode(msg) == tipc_own_addr)
 649                        break;
 650                msg_set_prevnode(msg, tipc_own_addr);
 651                msg_set_orignode(msg, tipc_own_addr);
 652        }
 653        spin_unlock_bh(&tipc_port_list_lock);
 654}
 655
 656
 657/*
 658 *  port_dispatcher_sigh(): Signal handler for messages destinated
 659 *                          to the tipc_port interface.
 660 */
 661
 662static void port_dispatcher_sigh(void *dummy)
 663{
 664        struct sk_buff *buf;
 665
 666        spin_lock_bh(&queue_lock);
 667        buf = msg_queue_head;
 668        msg_queue_head = NULL;
 669        spin_unlock_bh(&queue_lock);
 670
 671        while (buf) {
 672                struct tipc_port *p_ptr;
 673                struct user_port *up_ptr;
 674                struct tipc_portid orig;
 675                struct tipc_name_seq dseq;
 676                void *usr_handle;
 677                int connected;
 678                int published;
 679                u32 message_type;
 680
 681                struct sk_buff *next = buf->next;
 682                struct tipc_msg *msg = buf_msg(buf);
 683                u32 dref = msg_destport(msg);
 684
 685                message_type = msg_type(msg);
 686                if (message_type > TIPC_DIRECT_MSG)
 687                        goto reject;    /* Unsupported message type */
 688
 689                p_ptr = tipc_port_lock(dref);
 690                if (!p_ptr)
 691                        goto reject;    /* Port deleted while msg in queue */
 692
 693                orig.ref = msg_origport(msg);
 694                orig.node = msg_orignode(msg);
 695                up_ptr = p_ptr->user_port;
 696                usr_handle = up_ptr->usr_handle;
 697                connected = p_ptr->connected;
 698                published = p_ptr->published;
 699
 700                if (unlikely(msg_errcode(msg)))
 701                        goto err;
 702
 703                switch (message_type) {
 704
 705                case TIPC_CONN_MSG:{
 706                                tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
 707                                u32 peer_port = port_peerport(p_ptr);
 708                                u32 peer_node = port_peernode(p_ptr);
 709                                u32 dsz;
 710
 711                                tipc_port_unlock(p_ptr);
 712                                if (unlikely(!cb))
 713                                        goto reject;
 714                                if (unlikely(!connected)) {
 715                                        if (tipc_connect2port(dref, &orig))
 716                                                goto reject;
 717                                } else if ((msg_origport(msg) != peer_port) ||
 718                                           (msg_orignode(msg) != peer_node))
 719                                        goto reject;
 720                                dsz = msg_data_sz(msg);
 721                                if (unlikely(dsz &&
 722                                             (++p_ptr->conn_unacked >=
 723                                              TIPC_FLOW_CONTROL_WIN)))
 724                                        tipc_acknowledge(dref,
 725                                                         p_ptr->conn_unacked);
 726                                skb_pull(buf, msg_hdr_sz(msg));
 727                                cb(usr_handle, dref, &buf, msg_data(msg), dsz);
 728                                break;
 729                        }
 730                case TIPC_DIRECT_MSG:{
 731                                tipc_msg_event cb = up_ptr->msg_cb;
 732
 733                                tipc_port_unlock(p_ptr);
 734                                if (unlikely(!cb || connected))
 735                                        goto reject;
 736                                skb_pull(buf, msg_hdr_sz(msg));
 737                                cb(usr_handle, dref, &buf, msg_data(msg),
 738                                   msg_data_sz(msg), msg_importance(msg),
 739                                   &orig);
 740                                break;
 741                        }
 742                case TIPC_MCAST_MSG:
 743                case TIPC_NAMED_MSG:{
 744                                tipc_named_msg_event cb = up_ptr->named_msg_cb;
 745
 746                                tipc_port_unlock(p_ptr);
 747                                if (unlikely(!cb || connected || !published))
 748                                        goto reject;
 749                                dseq.type =  msg_nametype(msg);
 750                                dseq.lower = msg_nameinst(msg);
 751                                dseq.upper = (message_type == TIPC_NAMED_MSG)
 752                                        ? dseq.lower : msg_nameupper(msg);
 753                                skb_pull(buf, msg_hdr_sz(msg));
 754                                cb(usr_handle, dref, &buf, msg_data(msg),
 755                                   msg_data_sz(msg), msg_importance(msg),
 756                                   &orig, &dseq);
 757                                break;
 758                        }
 759                }
 760                if (buf)
 761                        buf_discard(buf);
 762                buf = next;
 763                continue;
 764err:
 765                switch (message_type) {
 766
 767                case TIPC_CONN_MSG:{
 768                                tipc_conn_shutdown_event cb =
 769                                        up_ptr->conn_err_cb;
 770                                u32 peer_port = port_peerport(p_ptr);
 771                                u32 peer_node = port_peernode(p_ptr);
 772
 773                                tipc_port_unlock(p_ptr);
 774                                if (!cb || !connected)
 775                                        break;
 776                                if ((msg_origport(msg) != peer_port) ||
 777                                    (msg_orignode(msg) != peer_node))
 778                                        break;
 779                                tipc_disconnect(dref);
 780                                skb_pull(buf, msg_hdr_sz(msg));
 781                                cb(usr_handle, dref, &buf, msg_data(msg),
 782                                   msg_data_sz(msg), msg_errcode(msg));
 783                                break;
 784                        }
 785                case TIPC_DIRECT_MSG:{
 786                                tipc_msg_err_event cb = up_ptr->err_cb;
 787
 788                                tipc_port_unlock(p_ptr);
 789                                if (!cb || connected)
 790                                        break;
 791                                skb_pull(buf, msg_hdr_sz(msg));
 792                                cb(usr_handle, dref, &buf, msg_data(msg),
 793                                   msg_data_sz(msg), msg_errcode(msg), &orig);
 794                                break;
 795                        }
 796                case TIPC_MCAST_MSG:
 797                case TIPC_NAMED_MSG:{
 798                                tipc_named_msg_err_event cb =
 799                                        up_ptr->named_err_cb;
 800
 801                                tipc_port_unlock(p_ptr);
 802                                if (!cb || connected)
 803                                        break;
 804                                dseq.type =  msg_nametype(msg);
 805                                dseq.lower = msg_nameinst(msg);
 806                                dseq.upper = (message_type == TIPC_NAMED_MSG)
 807                                        ? dseq.lower : msg_nameupper(msg);
 808                                skb_pull(buf, msg_hdr_sz(msg));
 809                                cb(usr_handle, dref, &buf, msg_data(msg),
 810                                   msg_data_sz(msg), msg_errcode(msg), &dseq);
 811                                break;
 812                        }
 813                }
 814                if (buf)
 815                        buf_discard(buf);
 816                buf = next;
 817                continue;
 818reject:
 819                tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 820                buf = next;
 821        }
 822}
 823
 824/*
 825 *  port_dispatcher(): Dispatcher for messages destinated
 826 *  to the tipc_port interface. Called with port locked.
 827 */
 828
 829static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
 830{
 831        buf->next = NULL;
 832        spin_lock_bh(&queue_lock);
 833        if (msg_queue_head) {
 834                msg_queue_tail->next = buf;
 835                msg_queue_tail = buf;
 836        } else {
 837                msg_queue_tail = msg_queue_head = buf;
 838                tipc_k_signal((Handler)port_dispatcher_sigh, 0);
 839        }
 840        spin_unlock_bh(&queue_lock);
 841        return 0;
 842}
 843
 844/*
 845 * Wake up port after congestion: Called with port locked,
 846 *
 847 */
 848
 849static void port_wakeup_sh(unsigned long ref)
 850{
 851        struct tipc_port *p_ptr;
 852        struct user_port *up_ptr;
 853        tipc_continue_event cb = NULL;
 854        void *uh = NULL;
 855
 856        p_ptr = tipc_port_lock(ref);
 857        if (p_ptr) {
 858                up_ptr = p_ptr->user_port;
 859                if (up_ptr) {
 860                        cb = up_ptr->continue_event_cb;
 861                        uh = up_ptr->usr_handle;
 862                }
 863                tipc_port_unlock(p_ptr);
 864        }
 865        if (cb)
 866                cb(uh, ref);
 867}
 868
 869
 870static void port_wakeup(struct tipc_port *p_ptr)
 871{
 872        tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
 873}
 874
 875void tipc_acknowledge(u32 ref, u32 ack)
 876{
 877        struct tipc_port *p_ptr;
 878        struct sk_buff *buf = NULL;
 879
 880        p_ptr = tipc_port_lock(ref);
 881        if (!p_ptr)
 882                return;
 883        if (p_ptr->connected) {
 884                p_ptr->conn_unacked -= ack;
 885                buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
 886        }
 887        tipc_port_unlock(p_ptr);
 888        tipc_net_route_msg(buf);
 889}
 890
 891/*
 892 * tipc_createport(): user level call.
 893 */
 894
 895int tipc_createport(void *usr_handle,
 896                    unsigned int importance,
 897                    tipc_msg_err_event error_cb,
 898                    tipc_named_msg_err_event named_error_cb,
 899                    tipc_conn_shutdown_event conn_error_cb,
 900                    tipc_msg_event msg_cb,
 901                    tipc_named_msg_event named_msg_cb,
 902                    tipc_conn_msg_event conn_msg_cb,
 903                    tipc_continue_event continue_event_cb,/* May be zero */
 904                    u32 *portref)
 905{
 906        struct user_port *up_ptr;
 907        struct tipc_port *p_ptr;
 908
 909        up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
 910        if (!up_ptr) {
 911                warn("Port creation failed, no memory\n");
 912                return -ENOMEM;
 913        }
 914        p_ptr = (struct tipc_port *)tipc_createport_raw(NULL, port_dispatcher,
 915                                                   port_wakeup, importance);
 916        if (!p_ptr) {
 917                kfree(up_ptr);
 918                return -ENOMEM;
 919        }
 920
 921        p_ptr->user_port = up_ptr;
 922        up_ptr->usr_handle = usr_handle;
 923        up_ptr->ref = p_ptr->ref;
 924        up_ptr->err_cb = error_cb;
 925        up_ptr->named_err_cb = named_error_cb;
 926        up_ptr->conn_err_cb = conn_error_cb;
 927        up_ptr->msg_cb = msg_cb;
 928        up_ptr->named_msg_cb = named_msg_cb;
 929        up_ptr->conn_msg_cb = conn_msg_cb;
 930        up_ptr->continue_event_cb = continue_event_cb;
 931        *portref = p_ptr->ref;
 932        tipc_port_unlock(p_ptr);
 933        return 0;
 934}
 935
 936int tipc_portimportance(u32 ref, unsigned int *importance)
 937{
 938        struct tipc_port *p_ptr;
 939
 940        p_ptr = tipc_port_lock(ref);
 941        if (!p_ptr)
 942                return -EINVAL;
 943        *importance = (unsigned int)msg_importance(&p_ptr->phdr);
 944        tipc_port_unlock(p_ptr);
 945        return 0;
 946}
 947
 948int tipc_set_portimportance(u32 ref, unsigned int imp)
 949{
 950        struct tipc_port *p_ptr;
 951
 952        if (imp > TIPC_CRITICAL_IMPORTANCE)
 953                return -EINVAL;
 954
 955        p_ptr = tipc_port_lock(ref);
 956        if (!p_ptr)
 957                return -EINVAL;
 958        msg_set_importance(&p_ptr->phdr, (u32)imp);
 959        tipc_port_unlock(p_ptr);
 960        return 0;
 961}
 962
 963
 964int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
 965{
 966        struct tipc_port *p_ptr;
 967        struct publication *publ;
 968        u32 key;
 969        int res = -EINVAL;
 970
 971        p_ptr = tipc_port_lock(ref);
 972        if (!p_ptr)
 973                return -EINVAL;
 974
 975        if (p_ptr->connected)
 976                goto exit;
 977        if (seq->lower > seq->upper)
 978                goto exit;
 979        if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
 980                goto exit;
 981        key = ref + p_ptr->pub_count + 1;
 982        if (key == ref) {
 983                res = -EADDRINUSE;
 984                goto exit;
 985        }
 986        publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
 987                                    scope, p_ptr->ref, key);
 988        if (publ) {
 989                list_add(&publ->pport_list, &p_ptr->publications);
 990                p_ptr->pub_count++;
 991                p_ptr->published = 1;
 992                res = 0;
 993        }
 994exit:
 995        tipc_port_unlock(p_ptr);
 996        return res;
 997}
 998
 999int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1000{
1001        struct tipc_port *p_ptr;
1002        struct publication *publ;
1003        struct publication *tpubl;
1004        int res = -EINVAL;
1005
1006        p_ptr = tipc_port_lock(ref);
1007        if (!p_ptr)
1008                return -EINVAL;
1009        if (!seq) {
1010                list_for_each_entry_safe(publ, tpubl,
1011                                         &p_ptr->publications, pport_list) {
1012                        tipc_nametbl_withdraw(publ->type, publ->lower,
1013                                              publ->ref, publ->key);
1014                }
1015                res = 0;
1016        } else {
1017                list_for_each_entry_safe(publ, tpubl,
1018                                         &p_ptr->publications, pport_list) {
1019                        if (publ->scope != scope)
1020                                continue;
1021                        if (publ->type != seq->type)
1022                                continue;
1023                        if (publ->lower != seq->lower)
1024                                continue;
1025                        if (publ->upper != seq->upper)
1026                                break;
1027                        tipc_nametbl_withdraw(publ->type, publ->lower,
1028                                              publ->ref, publ->key);
1029                        res = 0;
1030                        break;
1031                }
1032        }
1033        if (list_empty(&p_ptr->publications))
1034                p_ptr->published = 0;
1035        tipc_port_unlock(p_ptr);
1036        return res;
1037}
1038
1039int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1040{
1041        struct tipc_port *p_ptr;
1042        struct tipc_msg *msg;
1043        int res = -EINVAL;
1044
1045        p_ptr = tipc_port_lock(ref);
1046        if (!p_ptr)
1047                return -EINVAL;
1048        if (p_ptr->published || p_ptr->connected)
1049                goto exit;
1050        if (!peer->ref)
1051                goto exit;
1052
1053        msg = &p_ptr->phdr;
1054        msg_set_destnode(msg, peer->node);
1055        msg_set_destport(msg, peer->ref);
1056        msg_set_orignode(msg, tipc_own_addr);
1057        msg_set_origport(msg, p_ptr->ref);
1058        msg_set_type(msg, TIPC_CONN_MSG);
1059        msg_set_lookup_scope(msg, 0);
1060        msg_set_hdr_sz(msg, SHORT_H_SIZE);
1061
1062        p_ptr->probing_interval = PROBING_INTERVAL;
1063        p_ptr->probing_state = CONFIRMED;
1064        p_ptr->connected = 1;
1065        k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1066
1067        tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
1068                          (void *)(unsigned long)ref,
1069                          (net_ev_handler)port_handle_node_down);
1070        res = 0;
1071exit:
1072        tipc_port_unlock(p_ptr);
1073        p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1074        return res;
1075}
1076
1077/**
1078 * tipc_disconnect_port - disconnect port from peer
1079 *
1080 * Port must be locked.
1081 */
1082
1083int tipc_disconnect_port(struct tipc_port *tp_ptr)
1084{
1085        int res;
1086
1087        if (tp_ptr->connected) {
1088                tp_ptr->connected = 0;
1089                /* let timer expire on it's own to avoid deadlock! */
1090                tipc_nodesub_unsubscribe(
1091                        &((struct tipc_port *)tp_ptr)->subscription);
1092                res = 0;
1093        } else {
1094                res = -ENOTCONN;
1095        }
1096        return res;
1097}
1098
1099/*
1100 * tipc_disconnect(): Disconnect port form peer.
1101 *                    This is a node local operation.
1102 */
1103
1104int tipc_disconnect(u32 ref)
1105{
1106        struct tipc_port *p_ptr;
1107        int res;
1108
1109        p_ptr = tipc_port_lock(ref);
1110        if (!p_ptr)
1111                return -EINVAL;
1112        res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1113        tipc_port_unlock(p_ptr);
1114        return res;
1115}
1116
1117/*
1118 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1119 */
1120int tipc_shutdown(u32 ref)
1121{
1122        struct tipc_port *p_ptr;
1123        struct sk_buff *buf = NULL;
1124
1125        p_ptr = tipc_port_lock(ref);
1126        if (!p_ptr)
1127                return -EINVAL;
1128
1129        buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
1130        tipc_port_unlock(p_ptr);
1131        tipc_net_route_msg(buf);
1132        return tipc_disconnect(ref);
1133}
1134
1135/*
1136 *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1137 *                        message for this node.
1138 */
1139
1140static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
1141                                   struct iovec const *msg_sect,
1142                                   unsigned int total_len)
1143{
1144        struct sk_buff *buf;
1145        int res;
1146
1147        res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
1148                        MAX_MSG_SIZE, !sender->user_port, &buf);
1149        if (likely(buf))
1150                tipc_port_recv_msg(buf);
1151        return res;
1152}
1153
1154/**
1155 * tipc_send - send message sections on connection
1156 */
1157
1158int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
1159              unsigned int total_len)
1160{
1161        struct tipc_port *p_ptr;
1162        u32 destnode;
1163        int res;
1164
1165        p_ptr = tipc_port_deref(ref);
1166        if (!p_ptr || !p_ptr->connected)
1167                return -EINVAL;
1168
1169        p_ptr->congested = 1;
1170        if (!tipc_port_congested(p_ptr)) {
1171                destnode = port_peernode(p_ptr);
1172                if (likely(destnode != tipc_own_addr))
1173                        res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1174                                                           total_len, destnode);
1175                else
1176                        res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1177                                                      total_len);
1178
1179                if (likely(res != -ELINKCONG)) {
1180                        p_ptr->congested = 0;
1181                        if (res > 0)
1182                                p_ptr->sent++;
1183                        return res;
1184                }
1185        }
1186        if (port_unreliable(p_ptr)) {
1187                p_ptr->congested = 0;
1188                return total_len;
1189        }
1190        return -ELINKCONG;
1191}
1192
1193/**
1194 * tipc_send2name - send message sections to port name
1195 */
1196
1197int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1198                   unsigned int num_sect, struct iovec const *msg_sect,
1199                   unsigned int total_len)
1200{
1201        struct tipc_port *p_ptr;
1202        struct tipc_msg *msg;
1203        u32 destnode = domain;
1204        u32 destport;
1205        int res;
1206
1207        p_ptr = tipc_port_deref(ref);
1208        if (!p_ptr || p_ptr->connected)
1209                return -EINVAL;
1210
1211        msg = &p_ptr->phdr;
1212        msg_set_type(msg, TIPC_NAMED_MSG);
1213        msg_set_orignode(msg, tipc_own_addr);
1214        msg_set_origport(msg, ref);
1215        msg_set_hdr_sz(msg, NAMED_H_SIZE);
1216        msg_set_nametype(msg, name->type);
1217        msg_set_nameinst(msg, name->instance);
1218        msg_set_lookup_scope(msg, tipc_addr_scope(domain));
1219        destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1220        msg_set_destnode(msg, destnode);
1221        msg_set_destport(msg, destport);
1222
1223        if (likely(destport)) {
1224                if (likely(destnode == tipc_own_addr))
1225                        res = tipc_port_recv_sections(p_ptr, num_sect,
1226                                                      msg_sect, total_len);
1227                else
1228                        res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1229                                                           num_sect, total_len,
1230                                                           destnode);
1231                if (likely(res != -ELINKCONG)) {
1232                        if (res > 0)
1233                                p_ptr->sent++;
1234                        return res;
1235                }
1236                if (port_unreliable(p_ptr)) {
1237                        return total_len;
1238                }
1239                return -ELINKCONG;
1240        }
1241        return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1242                                         total_len, TIPC_ERR_NO_NAME);
1243}
1244
1245/**
1246 * tipc_send2port - send message sections to port identity
1247 */
1248
1249int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1250                   unsigned int num_sect, struct iovec const *msg_sect,
1251                   unsigned int total_len)
1252{
1253        struct tipc_port *p_ptr;
1254        struct tipc_msg *msg;
1255        int res;
1256
1257        p_ptr = tipc_port_deref(ref);
1258        if (!p_ptr || p_ptr->connected)
1259                return -EINVAL;
1260
1261        msg = &p_ptr->phdr;
1262        msg_set_type(msg, TIPC_DIRECT_MSG);
1263        msg_set_lookup_scope(msg, 0);
1264        msg_set_orignode(msg, tipc_own_addr);
1265        msg_set_origport(msg, ref);
1266        msg_set_destnode(msg, dest->node);
1267        msg_set_destport(msg, dest->ref);
1268        msg_set_hdr_sz(msg, BASIC_H_SIZE);
1269
1270        if (dest->node == tipc_own_addr)
1271                res =  tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1272                                               total_len);
1273        else
1274                res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1275                                                   total_len, dest->node);
1276        if (likely(res != -ELINKCONG)) {
1277                if (res > 0)
1278                        p_ptr->sent++;
1279                return res;
1280        }
1281        if (port_unreliable(p_ptr)) {
1282                return total_len;
1283        }
1284        return -ELINKCONG;
1285}
1286
1287/**
1288 * tipc_send_buf2port - send message buffer to port identity
1289 */
1290
1291int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1292               struct sk_buff *buf, unsigned int dsz)
1293{
1294        struct tipc_port *p_ptr;
1295        struct tipc_msg *msg;
1296        int res;
1297
1298        p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
1299        if (!p_ptr || p_ptr->connected)
1300                return -EINVAL;
1301
1302        msg = &p_ptr->phdr;
1303        msg_set_type(msg, TIPC_DIRECT_MSG);
1304        msg_set_orignode(msg, tipc_own_addr);
1305        msg_set_origport(msg, ref);
1306        msg_set_destnode(msg, dest->node);
1307        msg_set_destport(msg, dest->ref);
1308        msg_set_hdr_sz(msg, BASIC_H_SIZE);
1309        msg_set_size(msg, BASIC_H_SIZE + dsz);
1310        if (skb_cow(buf, BASIC_H_SIZE))
1311                return -ENOMEM;
1312
1313        skb_push(buf, BASIC_H_SIZE);
1314        skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE);
1315
1316        if (dest->node == tipc_own_addr)
1317                res = tipc_port_recv_msg(buf);
1318        else
1319                res = tipc_send_buf_fast(buf, dest->node);
1320        if (likely(res != -ELINKCONG)) {
1321                if (res > 0)
1322                        p_ptr->sent++;
1323                return res;
1324        }
1325        if (port_unreliable(p_ptr))
1326                return dsz;
1327        return -ELINKCONG;
1328}
1329
1330