linux/fs/dlm/lowcomms.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14/*
  15 * lowcomms.c
  16 *
  17 * This is the "low-level" comms layer.
  18 *
  19 * It is responsible for sending/receiving messages
  20 * from other nodes in the cluster.
  21 *
  22 * Cluster nodes are referred to by their nodeids. nodeids are
  23 * simply 32 bit numbers to the locking module - if they need to
  24 * be expanded for the cluster infrastructure then that is its
  25 * responsibility. It is this layer's
  26 * responsibility to resolve these into IP address or
  27 * whatever it needs for inter-node communication.
  28 *
  29 * The comms level is two kernel threads that deal mainly with
  30 * the receiving of messages from other nodes and passing them
  31 * up to the mid-level comms layer (which understands the
  32 * message format) for execution by the locking core, and
  33 * a send thread which does all the setting up of connections
  34 * to remote nodes and the sending of data. Threads are not allowed
  35 * to send their own data because it may cause them to wait in times
  36 * of high load. Also, this way, the sending thread can collect together
  37 * messages bound for one node and send them in one block.
  38 *
  39 * lowcomms will choose to use either TCP or SCTP as its transport layer
  40 * depending on the configuration variable 'protocol'. This should be set
  41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
  43 * for the DLM to function.
  44 *
  45 */
  46
  47#include <asm/ioctls.h>
  48#include <net/sock.h>
  49#include <net/tcp.h>
  50#include <linux/pagemap.h>
  51#include <linux/file.h>
  52#include <linux/mutex.h>
  53#include <linux/sctp.h>
  54#include <linux/slab.h>
  55#include <net/sctp/sctp.h>
  56#include <net/ipv6.h>
  57
  58#include "dlm_internal.h"
  59#include "lowcomms.h"
  60#include "midcomms.h"
  61#include "config.h"
  62
  63#define NEEDED_RMEM (4*1024*1024)
  64#define CONN_HASH_SIZE 32
  65
  66/* Number of messages to send before rescheduling */
  67#define MAX_SEND_MSG_COUNT 25
  68
  69struct cbuf {
  70        unsigned int base;
  71        unsigned int len;
  72        unsigned int mask;
  73};
  74
  75static void cbuf_add(struct cbuf *cb, int n)
  76{
  77        cb->len += n;
  78}
  79
  80static int cbuf_data(struct cbuf *cb)
  81{
  82        return ((cb->base + cb->len) & cb->mask);
  83}
  84
  85static void cbuf_init(struct cbuf *cb, int size)
  86{
  87        cb->base = cb->len = 0;
  88        cb->mask = size-1;
  89}
  90
  91static void cbuf_eat(struct cbuf *cb, int n)
  92{
  93        cb->len  -= n;
  94        cb->base += n;
  95        cb->base &= cb->mask;
  96}
  97
  98static bool cbuf_empty(struct cbuf *cb)
  99{
 100        return cb->len == 0;
 101}
 102
 103struct connection {
 104        struct socket *sock;    /* NULL if not connected */
 105        uint32_t nodeid;        /* So we know who we are in the list */
 106        struct mutex sock_mutex;
 107        unsigned long flags;
 108#define CF_READ_PENDING 1
 109#define CF_WRITE_PENDING 2
 110#define CF_CONNECT_PENDING 3
 111#define CF_INIT_PENDING 4
 112#define CF_IS_OTHERCON 5
 113#define CF_CLOSE 6
 114#define CF_APP_LIMITED 7
 115        struct list_head writequeue;  /* List of outgoing writequeue_entries */
 116        spinlock_t writequeue_lock;
 117        int (*rx_action) (struct connection *); /* What to do when active */
 118        void (*connect_action) (struct connection *);   /* What to do to connect */
 119        struct page *rx_page;
 120        struct cbuf cb;
 121        int retries;
 122#define MAX_CONNECT_RETRIES 3
 123        int sctp_assoc;
 124        struct hlist_node list;
 125        struct connection *othercon;
 126        struct work_struct rwork; /* Receive workqueue */
 127        struct work_struct swork; /* Send workqueue */
 128        bool try_new_addr;
 129};
 130#define sock2con(x) ((struct connection *)(x)->sk_user_data)
 131
 132/* An entry waiting to be sent */
 133struct writequeue_entry {
 134        struct list_head list;
 135        struct page *page;
 136        int offset;
 137        int len;
 138        int end;
 139        int users;
 140        struct connection *con;
 141};
 142
 143struct dlm_node_addr {
 144        struct list_head list;
 145        int nodeid;
 146        int addr_count;
 147        int curr_addr_index;
 148        struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
 149};
 150
 151static LIST_HEAD(dlm_node_addrs);
 152static DEFINE_SPINLOCK(dlm_node_addrs_spin);
 153
 154static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 155static int dlm_local_count;
 156static int dlm_allow_conn;
 157
 158/* Work queues */
 159static struct workqueue_struct *recv_workqueue;
 160static struct workqueue_struct *send_workqueue;
 161
 162static struct hlist_head connection_hash[CONN_HASH_SIZE];
 163static DEFINE_MUTEX(connections_lock);
 164static struct kmem_cache *con_cache;
 165
 166static void process_recv_sockets(struct work_struct *work);
 167static void process_send_sockets(struct work_struct *work);
 168
 169
 170/* This is deliberately very simple because most clusters have simple
 171   sequential nodeids, so we should be able to go straight to a connection
 172   struct in the array */
 173static inline int nodeid_hash(int nodeid)
 174{
 175        return nodeid & (CONN_HASH_SIZE-1);
 176}
 177
 178static struct connection *__find_con(int nodeid)
 179{
 180        int r;
 181        struct connection *con;
 182
 183        r = nodeid_hash(nodeid);
 184
 185        hlist_for_each_entry(con, &connection_hash[r], list) {
 186                if (con->nodeid == nodeid)
 187                        return con;
 188        }
 189        return NULL;
 190}
 191
 192/*
 193 * If 'allocation' is zero then we don't attempt to create a new
 194 * connection structure for this node.
 195 */
 196static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
 197{
 198        struct connection *con = NULL;
 199        int r;
 200
 201        con = __find_con(nodeid);
 202        if (con || !alloc)
 203                return con;
 204
 205        con = kmem_cache_zalloc(con_cache, alloc);
 206        if (!con)
 207                return NULL;
 208
 209        r = nodeid_hash(nodeid);
 210        hlist_add_head(&con->list, &connection_hash[r]);
 211
 212        con->nodeid = nodeid;
 213        mutex_init(&con->sock_mutex);
 214        INIT_LIST_HEAD(&con->writequeue);
 215        spin_lock_init(&con->writequeue_lock);
 216        INIT_WORK(&con->swork, process_send_sockets);
 217        INIT_WORK(&con->rwork, process_recv_sockets);
 218
 219        /* Setup action pointers for child sockets */
 220        if (con->nodeid) {
 221                struct connection *zerocon = __find_con(0);
 222
 223                con->connect_action = zerocon->connect_action;
 224                if (!con->rx_action)
 225                        con->rx_action = zerocon->rx_action;
 226        }
 227
 228        return con;
 229}
 230
 231/* Loop round all connections */
 232static void foreach_conn(void (*conn_func)(struct connection *c))
 233{
 234        int i;
 235        struct hlist_node *n;
 236        struct connection *con;
 237
 238        for (i = 0; i < CONN_HASH_SIZE; i++) {
 239                hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
 240                        conn_func(con);
 241        }
 242}
 243
 244static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 245{
 246        struct connection *con;
 247
 248        mutex_lock(&connections_lock);
 249        con = __nodeid2con(nodeid, allocation);
 250        mutex_unlock(&connections_lock);
 251
 252        return con;
 253}
 254
 255/* This is a bit drastic, but only called when things go wrong */
 256static struct connection *assoc2con(int assoc_id)
 257{
 258        int i;
 259        struct connection *con;
 260
 261        mutex_lock(&connections_lock);
 262
 263        for (i = 0 ; i < CONN_HASH_SIZE; i++) {
 264                hlist_for_each_entry(con, &connection_hash[i], list) {
 265                        if (con->sctp_assoc == assoc_id) {
 266                                mutex_unlock(&connections_lock);
 267                                return con;
 268                        }
 269                }
 270        }
 271        mutex_unlock(&connections_lock);
 272        return NULL;
 273}
 274
 275static struct dlm_node_addr *find_node_addr(int nodeid)
 276{
 277        struct dlm_node_addr *na;
 278
 279        list_for_each_entry(na, &dlm_node_addrs, list) {
 280                if (na->nodeid == nodeid)
 281                        return na;
 282        }
 283        return NULL;
 284}
 285
 286static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
 287{
 288        switch (x->ss_family) {
 289        case AF_INET: {
 290                struct sockaddr_in *sinx = (struct sockaddr_in *)x;
 291                struct sockaddr_in *siny = (struct sockaddr_in *)y;
 292                if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
 293                        return 0;
 294                if (sinx->sin_port != siny->sin_port)
 295                        return 0;
 296                break;
 297        }
 298        case AF_INET6: {
 299                struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
 300                struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
 301                if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
 302                        return 0;
 303                if (sinx->sin6_port != siny->sin6_port)
 304                        return 0;
 305                break;
 306        }
 307        default:
 308                return 0;
 309        }
 310        return 1;
 311}
 312
 313static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
 314                          struct sockaddr *sa_out, bool try_new_addr)
 315{
 316        struct sockaddr_storage sas;
 317        struct dlm_node_addr *na;
 318
 319        if (!dlm_local_count)
 320                return -1;
 321
 322        spin_lock(&dlm_node_addrs_spin);
 323        na = find_node_addr(nodeid);
 324        if (na && na->addr_count) {
 325                if (try_new_addr) {
 326                        na->curr_addr_index++;
 327                        if (na->curr_addr_index == na->addr_count)
 328                                na->curr_addr_index = 0;
 329                }
 330
 331                memcpy(&sas, na->addr[na->curr_addr_index ],
 332                        sizeof(struct sockaddr_storage));
 333        }
 334        spin_unlock(&dlm_node_addrs_spin);
 335
 336        if (!na)
 337                return -EEXIST;
 338
 339        if (!na->addr_count)
 340                return -ENOENT;
 341
 342        if (sas_out)
 343                memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
 344
 345        if (!sa_out)
 346                return 0;
 347
 348        if (dlm_local_addr[0]->ss_family == AF_INET) {
 349                struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
 350                struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
 351                ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 352        } else {
 353                struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
 354                struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
 355                ret6->sin6_addr = in6->sin6_addr;
 356        }
 357
 358        return 0;
 359}
 360
 361static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
 362{
 363        struct dlm_node_addr *na;
 364        int rv = -EEXIST;
 365        int addr_i;
 366
 367        spin_lock(&dlm_node_addrs_spin);
 368        list_for_each_entry(na, &dlm_node_addrs, list) {
 369                if (!na->addr_count)
 370                        continue;
 371
 372                for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
 373                        if (addr_compare(na->addr[addr_i], addr)) {
 374                                *nodeid = na->nodeid;
 375                                rv = 0;
 376                                goto unlock;
 377                        }
 378                }
 379        }
 380unlock:
 381        spin_unlock(&dlm_node_addrs_spin);
 382        return rv;
 383}
 384
 385int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 386{
 387        struct sockaddr_storage *new_addr;
 388        struct dlm_node_addr *new_node, *na;
 389
 390        new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
 391        if (!new_node)
 392                return -ENOMEM;
 393
 394        new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
 395        if (!new_addr) {
 396                kfree(new_node);
 397                return -ENOMEM;
 398        }
 399
 400        memcpy(new_addr, addr, len);
 401
 402        spin_lock(&dlm_node_addrs_spin);
 403        na = find_node_addr(nodeid);
 404        if (!na) {
 405                new_node->nodeid = nodeid;
 406                new_node->addr[0] = new_addr;
 407                new_node->addr_count = 1;
 408                list_add(&new_node->list, &dlm_node_addrs);
 409                spin_unlock(&dlm_node_addrs_spin);
 410                return 0;
 411        }
 412
 413        if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
 414                spin_unlock(&dlm_node_addrs_spin);
 415                kfree(new_addr);
 416                kfree(new_node);
 417                return -ENOSPC;
 418        }
 419
 420        na->addr[na->addr_count++] = new_addr;
 421        spin_unlock(&dlm_node_addrs_spin);
 422        kfree(new_node);
 423        return 0;
 424}
 425
 426/* Data available on socket or listen socket received a connect */
 427static void lowcomms_data_ready(struct sock *sk, int count_unused)
 428{
 429        struct connection *con = sock2con(sk);
 430        if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
 431                queue_work(recv_workqueue, &con->rwork);
 432}
 433
 434static void lowcomms_write_space(struct sock *sk)
 435{
 436        struct connection *con = sock2con(sk);
 437
 438        if (!con)
 439                return;
 440
 441        clear_bit(SOCK_NOSPACE, &con->sock->flags);
 442
 443        if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
 444                con->sock->sk->sk_write_pending--;
 445                clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
 446        }
 447
 448        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 449                queue_work(send_workqueue, &con->swork);
 450}
 451
 452static inline void lowcomms_connect_sock(struct connection *con)
 453{
 454        if (test_bit(CF_CLOSE, &con->flags))
 455                return;
 456        if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
 457                queue_work(send_workqueue, &con->swork);
 458}
 459
 460static void lowcomms_state_change(struct sock *sk)
 461{
 462        if (sk->sk_state == TCP_ESTABLISHED)
 463                lowcomms_write_space(sk);
 464}
 465
 466int dlm_lowcomms_connect_node(int nodeid)
 467{
 468        struct connection *con;
 469
 470        /* with sctp there's no connecting without sending */
 471        if (dlm_config.ci_protocol != 0)
 472                return 0;
 473
 474        if (nodeid == dlm_our_nodeid())
 475                return 0;
 476
 477        con = nodeid2con(nodeid, GFP_NOFS);
 478        if (!con)
 479                return -ENOMEM;
 480        lowcomms_connect_sock(con);
 481        return 0;
 482}
 483
 484/* Make a socket active */
 485static void add_sock(struct socket *sock, struct connection *con)
 486{
 487        con->sock = sock;
 488
 489        /* Install a data_ready callback */
 490        con->sock->sk->sk_data_ready = lowcomms_data_ready;
 491        con->sock->sk->sk_write_space = lowcomms_write_space;
 492        con->sock->sk->sk_state_change = lowcomms_state_change;
 493        con->sock->sk->sk_user_data = con;
 494        con->sock->sk->sk_allocation = GFP_NOFS;
 495}
 496
 497/* Add the port number to an IPv6 or 4 sockaddr and return the address
 498   length */
 499static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 500                          int *addr_len)
 501{
 502        saddr->ss_family =  dlm_local_addr[0]->ss_family;
 503        if (saddr->ss_family == AF_INET) {
 504                struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 505                in4_addr->sin_port = cpu_to_be16(port);
 506                *addr_len = sizeof(struct sockaddr_in);
 507                memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
 508        } else {
 509                struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
 510                in6_addr->sin6_port = cpu_to_be16(port);
 511                *addr_len = sizeof(struct sockaddr_in6);
 512        }
 513        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 514}
 515
 516/* Close a remote connection and tidy up */
 517static void close_connection(struct connection *con, bool and_other)
 518{
 519        mutex_lock(&con->sock_mutex);
 520
 521        if (con->sock) {
 522                sock_release(con->sock);
 523                con->sock = NULL;
 524        }
 525        if (con->othercon && and_other) {
 526                /* Will only re-enter once. */
 527                close_connection(con->othercon, false);
 528        }
 529        if (con->rx_page) {
 530                __free_page(con->rx_page);
 531                con->rx_page = NULL;
 532        }
 533
 534        con->retries = 0;
 535        mutex_unlock(&con->sock_mutex);
 536}
 537
 538/* We only send shutdown messages to nodes that are not part of the cluster */
 539static void sctp_send_shutdown(sctp_assoc_t associd)
 540{
 541        static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 542        struct msghdr outmessage;
 543        struct cmsghdr *cmsg;
 544        struct sctp_sndrcvinfo *sinfo;
 545        int ret;
 546        struct connection *con;
 547
 548        con = nodeid2con(0,0);
 549        BUG_ON(con == NULL);
 550
 551        outmessage.msg_name = NULL;
 552        outmessage.msg_namelen = 0;
 553        outmessage.msg_control = outcmsg;
 554        outmessage.msg_controllen = sizeof(outcmsg);
 555        outmessage.msg_flags = MSG_EOR;
 556
 557        cmsg = CMSG_FIRSTHDR(&outmessage);
 558        cmsg->cmsg_level = IPPROTO_SCTP;
 559        cmsg->cmsg_type = SCTP_SNDRCV;
 560        cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
 561        outmessage.msg_controllen = cmsg->cmsg_len;
 562        sinfo = CMSG_DATA(cmsg);
 563        memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
 564
 565        sinfo->sinfo_flags |= MSG_EOF;
 566        sinfo->sinfo_assoc_id = associd;
 567
 568        ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
 569
 570        if (ret != 0)
 571                log_print("send EOF to node failed: %d", ret);
 572}
 573
 574static void sctp_init_failed_foreach(struct connection *con)
 575{
 576
 577        /*
 578         * Don't try to recover base con and handle race where the
 579         * other node's assoc init creates a assoc and we get that
 580         * notification, then we get a notification that our attempt
 581         * failed due. This happens when we are still trying the primary
 582         * address, but the other node has already tried secondary addrs
 583         * and found one that worked.
 584         */
 585        if (!con->nodeid || con->sctp_assoc)
 586                return;
 587
 588        log_print("Retrying SCTP association init for node %d\n", con->nodeid);
 589
 590        con->try_new_addr = true;
 591        con->sctp_assoc = 0;
 592        if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) {
 593                if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 594                        queue_work(send_workqueue, &con->swork);
 595        }
 596}
 597
 598/* INIT failed but we don't know which node...
 599   restart INIT on all pending nodes */
 600static void sctp_init_failed(void)
 601{
 602        mutex_lock(&connections_lock);
 603
 604        foreach_conn(sctp_init_failed_foreach);
 605
 606        mutex_unlock(&connections_lock);
 607}
 608
 609static void retry_failed_sctp_send(struct connection *recv_con,
 610                                   struct sctp_send_failed *sn_send_failed,
 611                                   char *buf)
 612{
 613        int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
 614        struct dlm_mhandle *mh;
 615        struct connection *con;
 616        char *retry_buf;
 617        int nodeid = sn_send_failed->ssf_info.sinfo_ppid;
 618
 619        log_print("Retry sending %d bytes to node id %d", len, nodeid);
 620
 621        con = nodeid2con(nodeid, 0);
 622        if (!con) {
 623                log_print("Could not look up con for nodeid %d\n",
 624                          nodeid);
 625                return;
 626        }
 627
 628        mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
 629        if (!mh) {
 630                log_print("Could not allocate buf for retry.");
 631                return;
 632        }
 633        memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
 634        dlm_lowcomms_commit_buffer(mh);
 635
 636        /*
 637         * If we got a assoc changed event before the send failed event then
 638         * we only need to retry the send.
 639         */
 640        if (con->sctp_assoc) {
 641                if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 642                        queue_work(send_workqueue, &con->swork);
 643        } else
 644                sctp_init_failed_foreach(con);
 645}
 646
 647/* Something happened to an association */
 648static void process_sctp_notification(struct connection *con,
 649                                      struct msghdr *msg, char *buf)
 650{
 651        union sctp_notification *sn = (union sctp_notification *)buf;
 652
 653        switch (sn->sn_header.sn_type) {
 654        case SCTP_SEND_FAILED:
 655                retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
 656                break;
 657        case SCTP_ASSOC_CHANGE:
 658                switch (sn->sn_assoc_change.sac_state) {
 659                case SCTP_COMM_UP:
 660                case SCTP_RESTART:
 661                {
 662                        /* Check that the new node is in the lockspace */
 663                        struct sctp_prim prim;
 664                        int nodeid;
 665                        int prim_len, ret;
 666                        int addr_len;
 667                        struct connection *new_con;
 668
 669                        /*
 670                         * We get this before any data for an association.
 671                         * We verify that the node is in the cluster and
 672                         * then peel off a socket for it.
 673                         */
 674                        if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
 675                                log_print("COMM_UP for invalid assoc ID %d",
 676                                         (int)sn->sn_assoc_change.sac_assoc_id);
 677                                sctp_init_failed();
 678                                return;
 679                        }
 680                        memset(&prim, 0, sizeof(struct sctp_prim));
 681                        prim_len = sizeof(struct sctp_prim);
 682                        prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
 683
 684                        ret = kernel_getsockopt(con->sock,
 685                                                IPPROTO_SCTP,
 686                                                SCTP_PRIMARY_ADDR,
 687                                                (char*)&prim,
 688                                                &prim_len);
 689                        if (ret < 0) {
 690                                log_print("getsockopt/sctp_primary_addr on "
 691                                          "new assoc %d failed : %d",
 692                                          (int)sn->sn_assoc_change.sac_assoc_id,
 693                                          ret);
 694
 695                                /* Retry INIT later */
 696                                new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
 697                                if (new_con)
 698                                        clear_bit(CF_CONNECT_PENDING, &con->flags);
 699                                return;
 700                        }
 701                        make_sockaddr(&prim.ssp_addr, 0, &addr_len);
 702                        if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
 703                                unsigned char *b=(unsigned char *)&prim.ssp_addr;
 704                                log_print("reject connect from unknown addr");
 705                                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 706                                                     b, sizeof(struct sockaddr_storage));
 707                                sctp_send_shutdown(prim.ssp_assoc_id);
 708                                return;
 709                        }
 710
 711                        new_con = nodeid2con(nodeid, GFP_NOFS);
 712                        if (!new_con)
 713                                return;
 714
 715                        /* Peel off a new sock */
 716                        sctp_lock_sock(con->sock->sk);
 717                        ret = sctp_do_peeloff(con->sock->sk,
 718                                sn->sn_assoc_change.sac_assoc_id,
 719                                &new_con->sock);
 720                        sctp_release_sock(con->sock->sk);
 721                        if (ret < 0) {
 722                                log_print("Can't peel off a socket for "
 723                                          "connection %d to node %d: err=%d",
 724                                          (int)sn->sn_assoc_change.sac_assoc_id,
 725                                          nodeid, ret);
 726                                return;
 727                        }
 728                        add_sock(new_con->sock, new_con);
 729
 730                        log_print("connecting to %d sctp association %d",
 731                                 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
 732
 733                        new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id;
 734                        new_con->try_new_addr = false;
 735                        /* Send any pending writes */
 736                        clear_bit(CF_CONNECT_PENDING, &new_con->flags);
 737                        clear_bit(CF_INIT_PENDING, &new_con->flags);
 738                        if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
 739                                queue_work(send_workqueue, &new_con->swork);
 740                        }
 741                        if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
 742                                queue_work(recv_workqueue, &new_con->rwork);
 743                }
 744                break;
 745
 746                case SCTP_COMM_LOST:
 747                case SCTP_SHUTDOWN_COMP:
 748                {
 749                        con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
 750                        if (con) {
 751                                con->sctp_assoc = 0;
 752                        }
 753                }
 754                break;
 755
 756                case SCTP_CANT_STR_ASSOC:
 757                {
 758                        /* Will retry init when we get the send failed notification */
 759                        log_print("Can't start SCTP association - retrying");
 760                }
 761                break;
 762
 763                default:
 764                        log_print("unexpected SCTP assoc change id=%d state=%d",
 765                                  (int)sn->sn_assoc_change.sac_assoc_id,
 766                                  sn->sn_assoc_change.sac_state);
 767                }
 768        default:
 769                ; /* fall through */
 770        }
 771}
 772
 773/* Data received from remote end */
 774static int receive_from_sock(struct connection *con)
 775{
 776        int ret = 0;
 777        struct msghdr msg = {};
 778        struct kvec iov[2];
 779        unsigned len;
 780        int r;
 781        int call_again_soon = 0;
 782        int nvec;
 783        char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 784
 785        mutex_lock(&con->sock_mutex);
 786
 787        if (con->sock == NULL) {
 788                ret = -EAGAIN;
 789                goto out_close;
 790        }
 791
 792        if (con->rx_page == NULL) {
 793                /*
 794                 * This doesn't need to be atomic, but I think it should
 795                 * improve performance if it is.
 796                 */
 797                con->rx_page = alloc_page(GFP_ATOMIC);
 798                if (con->rx_page == NULL)
 799                        goto out_resched;
 800                cbuf_init(&con->cb, PAGE_CACHE_SIZE);
 801        }
 802
 803        /* Only SCTP needs these really */
 804        memset(&incmsg, 0, sizeof(incmsg));
 805        msg.msg_control = incmsg;
 806        msg.msg_controllen = sizeof(incmsg);
 807
 808        /*
 809         * iov[0] is the bit of the circular buffer between the current end
 810         * point (cb.base + cb.len) and the end of the buffer.
 811         */
 812        iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
 813        iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
 814        iov[1].iov_len = 0;
 815        nvec = 1;
 816
 817        /*
 818         * iov[1] is the bit of the circular buffer between the start of the
 819         * buffer and the start of the currently used section (cb.base)
 820         */
 821        if (cbuf_data(&con->cb) >= con->cb.base) {
 822                iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
 823                iov[1].iov_len = con->cb.base;
 824                iov[1].iov_base = page_address(con->rx_page);
 825                nvec = 2;
 826        }
 827        len = iov[0].iov_len + iov[1].iov_len;
 828
 829        r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
 830                               MSG_DONTWAIT | MSG_NOSIGNAL);
 831        if (ret <= 0)
 832                goto out_close;
 833
 834        /* Process SCTP notifications */
 835        if (msg.msg_flags & MSG_NOTIFICATION) {
 836                msg.msg_control = incmsg;
 837                msg.msg_controllen = sizeof(incmsg);
 838
 839                process_sctp_notification(con, &msg,
 840                                page_address(con->rx_page) + con->cb.base);
 841                mutex_unlock(&con->sock_mutex);
 842                return 0;
 843        }
 844        BUG_ON(con->nodeid == 0);
 845
 846        if (ret == len)
 847                call_again_soon = 1;
 848        cbuf_add(&con->cb, ret);
 849        ret = dlm_process_incoming_buffer(con->nodeid,
 850                                          page_address(con->rx_page),
 851                                          con->cb.base, con->cb.len,
 852                                          PAGE_CACHE_SIZE);
 853        if (ret == -EBADMSG) {
 854                log_print("lowcomms: addr=%p, base=%u, len=%u, "
 855                          "iov_len=%u, iov_base[0]=%p, read=%d",
 856                          page_address(con->rx_page), con->cb.base, con->cb.len,
 857                          len, iov[0].iov_base, r);
 858        }
 859        if (ret < 0)
 860                goto out_close;
 861        cbuf_eat(&con->cb, ret);
 862
 863        if (cbuf_empty(&con->cb) && !call_again_soon) {
 864                __free_page(con->rx_page);
 865                con->rx_page = NULL;
 866        }
 867
 868        if (call_again_soon)
 869                goto out_resched;
 870        mutex_unlock(&con->sock_mutex);
 871        return 0;
 872
 873out_resched:
 874        if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
 875                queue_work(recv_workqueue, &con->rwork);
 876        mutex_unlock(&con->sock_mutex);
 877        return -EAGAIN;
 878
 879out_close:
 880        mutex_unlock(&con->sock_mutex);
 881        if (ret != -EAGAIN) {
 882                close_connection(con, false);
 883                /* Reconnect when there is something to send */
 884        }
 885        /* Don't return success if we really got EOF */
 886        if (ret == 0)
 887                ret = -EAGAIN;
 888
 889        return ret;
 890}
 891
 892/* Listening socket is busy, accept a connection */
 893static int tcp_accept_from_sock(struct connection *con)
 894{
 895        int result;
 896        struct sockaddr_storage peeraddr;
 897        struct socket *newsock;
 898        int len;
 899        int nodeid;
 900        struct connection *newcon;
 901        struct connection *addcon;
 902
 903        mutex_lock(&connections_lock);
 904        if (!dlm_allow_conn) {
 905                mutex_unlock(&connections_lock);
 906                return -1;
 907        }
 908        mutex_unlock(&connections_lock);
 909
 910        memset(&peeraddr, 0, sizeof(peeraddr));
 911        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
 912                                  IPPROTO_TCP, &newsock);
 913        if (result < 0)
 914                return -ENOMEM;
 915
 916        mutex_lock_nested(&con->sock_mutex, 0);
 917
 918        result = -ENOTCONN;
 919        if (con->sock == NULL)
 920                goto accept_err;
 921
 922        newsock->type = con->sock->type;
 923        newsock->ops = con->sock->ops;
 924
 925        result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
 926        if (result < 0)
 927                goto accept_err;
 928
 929        /* Get the connected socket's peer */
 930        memset(&peeraddr, 0, sizeof(peeraddr));
 931        if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
 932                                  &len, 2)) {
 933                result = -ECONNABORTED;
 934                goto accept_err;
 935        }
 936
 937        /* Get the new node's NODEID */
 938        make_sockaddr(&peeraddr, 0, &len);
 939        if (addr_to_nodeid(&peeraddr, &nodeid)) {
 940                unsigned char *b=(unsigned char *)&peeraddr;
 941                log_print("connect from non cluster node");
 942                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 943                                     b, sizeof(struct sockaddr_storage));
 944                sock_release(newsock);
 945                mutex_unlock(&con->sock_mutex);
 946                return -1;
 947        }
 948
 949        log_print("got connection from %d", nodeid);
 950
 951        /*  Check to see if we already have a connection to this node. This
 952         *  could happen if the two nodes initiate a connection at roughly
 953         *  the same time and the connections cross on the wire.
 954         *  In this case we store the incoming one in "othercon"
 955         */
 956        newcon = nodeid2con(nodeid, GFP_NOFS);
 957        if (!newcon) {
 958                result = -ENOMEM;
 959                goto accept_err;
 960        }
 961        mutex_lock_nested(&newcon->sock_mutex, 1);
 962        if (newcon->sock) {
 963                struct connection *othercon = newcon->othercon;
 964
 965                if (!othercon) {
 966                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 967                        if (!othercon) {
 968                                log_print("failed to allocate incoming socket");
 969                                mutex_unlock(&newcon->sock_mutex);
 970                                result = -ENOMEM;
 971                                goto accept_err;
 972                        }
 973                        othercon->nodeid = nodeid;
 974                        othercon->rx_action = receive_from_sock;
 975                        mutex_init(&othercon->sock_mutex);
 976                        INIT_WORK(&othercon->swork, process_send_sockets);
 977                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 978                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 979                }
 980                if (!othercon->sock) {
 981                        newcon->othercon = othercon;
 982                        othercon->sock = newsock;
 983                        newsock->sk->sk_user_data = othercon;
 984                        add_sock(newsock, othercon);
 985                        addcon = othercon;
 986                }
 987                else {
 988                        printk("Extra connection from node %d attempted\n", nodeid);
 989                        result = -EAGAIN;
 990                        mutex_unlock(&newcon->sock_mutex);
 991                        goto accept_err;
 992                }
 993        }
 994        else {
 995                newsock->sk->sk_user_data = newcon;
 996                newcon->rx_action = receive_from_sock;
 997                add_sock(newsock, newcon);
 998                addcon = newcon;
 999        }
1000
1001        mutex_unlock(&newcon->sock_mutex);
1002
1003        /*
1004         * Add it to the active queue in case we got data
1005         * between processing the accept adding the socket
1006         * to the read_sockets list
1007         */
1008        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
1009                queue_work(recv_workqueue, &addcon->rwork);
1010        mutex_unlock(&con->sock_mutex);
1011
1012        return 0;
1013
1014accept_err:
1015        mutex_unlock(&con->sock_mutex);
1016        sock_release(newsock);
1017
1018        if (result != -EAGAIN)
1019                log_print("error accepting connection from node: %d", result);
1020        return result;
1021}
1022
1023static void free_entry(struct writequeue_entry *e)
1024{
1025        __free_page(e->page);
1026        kfree(e);
1027}
1028
1029/*
1030 * writequeue_entry_complete - try to delete and free write queue entry
1031 * @e: write queue entry to try to delete
1032 * @completed: bytes completed
1033 *
1034 * writequeue_lock must be held.
1035 */
1036static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1037{
1038        e->offset += completed;
1039        e->len -= completed;
1040
1041        if (e->len == 0 && e->users == 0) {
1042                list_del(&e->list);
1043                free_entry(e);
1044        }
1045}
1046
1047/* Initiate an SCTP association.
1048   This is a special case of send_to_sock() in that we don't yet have a
1049   peeled-off socket for this association, so we use the listening socket
1050   and add the primary IP address of the remote node.
1051 */
1052static void sctp_init_assoc(struct connection *con)
1053{
1054        struct sockaddr_storage rem_addr;
1055        char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
1056        struct msghdr outmessage;
1057        struct cmsghdr *cmsg;
1058        struct sctp_sndrcvinfo *sinfo;
1059        struct connection *base_con;
1060        struct writequeue_entry *e;
1061        int len, offset;
1062        int ret;
1063        int addrlen;
1064        struct kvec iov[1];
1065
1066        mutex_lock(&con->sock_mutex);
1067        if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
1068                goto unlock;
1069
1070        if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr,
1071                           con->try_new_addr)) {
1072                log_print("no address for nodeid %d", con->nodeid);
1073                goto unlock;
1074        }
1075        base_con = nodeid2con(0, 0);
1076        BUG_ON(base_con == NULL);
1077
1078        make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
1079
1080        outmessage.msg_name = &rem_addr;
1081        outmessage.msg_namelen = addrlen;
1082        outmessage.msg_control = outcmsg;
1083        outmessage.msg_controllen = sizeof(outcmsg);
1084        outmessage.msg_flags = MSG_EOR;
1085
1086        spin_lock(&con->writequeue_lock);
1087
1088        if (list_empty(&con->writequeue)) {
1089                spin_unlock(&con->writequeue_lock);
1090                log_print("writequeue empty for nodeid %d", con->nodeid);
1091                goto unlock;
1092        }
1093
1094        e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
1095        len = e->len;
1096        offset = e->offset;
1097
1098        /* Send the first block off the write queue */
1099        iov[0].iov_base = page_address(e->page)+offset;
1100        iov[0].iov_len = len;
1101        spin_unlock(&con->writequeue_lock);
1102
1103        if (rem_addr.ss_family == AF_INET) {
1104                struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr;
1105                log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr);
1106        } else {
1107                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr;
1108                log_print("Trying to connect to %pI6", &sin6->sin6_addr);
1109        }
1110
1111        cmsg = CMSG_FIRSTHDR(&outmessage);
1112        cmsg->cmsg_level = IPPROTO_SCTP;
1113        cmsg->cmsg_type = SCTP_SNDRCV;
1114        cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1115        sinfo = CMSG_DATA(cmsg);
1116        memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1117        sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
1118        outmessage.msg_controllen = cmsg->cmsg_len;
1119        sinfo->sinfo_flags |= SCTP_ADDR_OVER;
1120
1121        ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
1122        if (ret < 0) {
1123                log_print("Send first packet to node %d failed: %d",
1124                          con->nodeid, ret);
1125
1126                /* Try again later */
1127                clear_bit(CF_CONNECT_PENDING, &con->flags);
1128                clear_bit(CF_INIT_PENDING, &con->flags);
1129        }
1130        else {
1131                spin_lock(&con->writequeue_lock);
1132                writequeue_entry_complete(e, ret);
1133                spin_unlock(&con->writequeue_lock);
1134        }
1135
1136unlock:
1137        mutex_unlock(&con->sock_mutex);
1138}
1139
1140/* Connect a new socket to its peer */
1141static void tcp_connect_to_sock(struct connection *con)
1142{
1143        struct sockaddr_storage saddr, src_addr;
1144        int addr_len;
1145        struct socket *sock = NULL;
1146        int one = 1;
1147        int result;
1148
1149        if (con->nodeid == 0) {
1150                log_print("attempt to connect sock 0 foiled");
1151                return;
1152        }
1153
1154        mutex_lock(&con->sock_mutex);
1155        if (con->retries++ > MAX_CONNECT_RETRIES)
1156                goto out;
1157
1158        /* Some odd races can cause double-connects, ignore them */
1159        if (con->sock)
1160                goto out;
1161
1162        /* Create a socket to communicate with */
1163        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1164                                  IPPROTO_TCP, &sock);
1165        if (result < 0)
1166                goto out_err;
1167
1168        memset(&saddr, 0, sizeof(saddr));
1169        result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1170        if (result < 0) {
1171                log_print("no address for nodeid %d", con->nodeid);
1172                goto out_err;
1173        }
1174
1175        sock->sk->sk_user_data = con;
1176        con->rx_action = receive_from_sock;
1177        con->connect_action = tcp_connect_to_sock;
1178        add_sock(sock, con);
1179
1180        /* Bind to our cluster-known address connecting to avoid
1181           routing problems */
1182        memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1183        make_sockaddr(&src_addr, 0, &addr_len);
1184        result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1185                                 addr_len);
1186        if (result < 0) {
1187                log_print("could not bind for connect: %d", result);
1188                /* This *may* not indicate a critical error */
1189        }
1190
1191        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1192
1193        log_print("connecting to %d", con->nodeid);
1194
1195        /* Turn off Nagle's algorithm */
1196        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1197                          sizeof(one));
1198
1199        result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1200                                   O_NONBLOCK);
1201        if (result == -EINPROGRESS)
1202                result = 0;
1203        if (result == 0)
1204                goto out;
1205
1206out_err:
1207        if (con->sock) {
1208                sock_release(con->sock);
1209                con->sock = NULL;
1210        } else if (sock) {
1211                sock_release(sock);
1212        }
1213        /*
1214         * Some errors are fatal and this list might need adjusting. For other
1215         * errors we try again until the max number of retries is reached.
1216         */
1217        if (result != -EHOSTUNREACH &&
1218            result != -ENETUNREACH &&
1219            result != -ENETDOWN && 
1220            result != -EINVAL &&
1221            result != -EPROTONOSUPPORT) {
1222                log_print("connect %d try %d error %d", con->nodeid,
1223                          con->retries, result);
1224                mutex_unlock(&con->sock_mutex);
1225                msleep(1000);
1226                lowcomms_connect_sock(con);
1227                return;
1228        }
1229out:
1230        mutex_unlock(&con->sock_mutex);
1231        return;
1232}
1233
1234static struct socket *tcp_create_listen_sock(struct connection *con,
1235                                             struct sockaddr_storage *saddr)
1236{
1237        struct socket *sock = NULL;
1238        int result = 0;
1239        int one = 1;
1240        int addr_len;
1241
1242        if (dlm_local_addr[0]->ss_family == AF_INET)
1243                addr_len = sizeof(struct sockaddr_in);
1244        else
1245                addr_len = sizeof(struct sockaddr_in6);
1246
1247        /* Create a socket to communicate with */
1248        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1249                                  IPPROTO_TCP, &sock);
1250        if (result < 0) {
1251                log_print("Can't create listening comms socket");
1252                goto create_out;
1253        }
1254
1255        /* Turn off Nagle's algorithm */
1256        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1257                          sizeof(one));
1258
1259        result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1260                                   (char *)&one, sizeof(one));
1261
1262        if (result < 0) {
1263                log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1264        }
1265        con->rx_action = tcp_accept_from_sock;
1266        con->connect_action = tcp_connect_to_sock;
1267
1268        /* Bind to our port */
1269        make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1270        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1271        if (result < 0) {
1272                log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1273                sock_release(sock);
1274                sock = NULL;
1275                con->sock = NULL;
1276                goto create_out;
1277        }
1278        result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1279                                 (char *)&one, sizeof(one));
1280        if (result < 0) {
1281                log_print("Set keepalive failed: %d", result);
1282        }
1283
1284        result = sock->ops->listen(sock, 5);
1285        if (result < 0) {
1286                log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1287                sock_release(sock);
1288                sock = NULL;
1289                goto create_out;
1290        }
1291
1292create_out:
1293        return sock;
1294}
1295
1296/* Get local addresses */
1297static void init_local(void)
1298{
1299        struct sockaddr_storage sas, *addr;
1300        int i;
1301
1302        dlm_local_count = 0;
1303        for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1304                if (dlm_our_addr(&sas, i))
1305                        break;
1306
1307                addr = kmalloc(sizeof(*addr), GFP_NOFS);
1308                if (!addr)
1309                        break;
1310                memcpy(addr, &sas, sizeof(*addr));
1311                dlm_local_addr[dlm_local_count++] = addr;
1312        }
1313}
1314
1315/* Bind to an IP address. SCTP allows multiple address so it can do
1316   multi-homing */
1317static int add_sctp_bind_addr(struct connection *sctp_con,
1318                              struct sockaddr_storage *addr,
1319                              int addr_len, int num)
1320{
1321        int result = 0;
1322
1323        if (num == 1)
1324                result = kernel_bind(sctp_con->sock,
1325                                     (struct sockaddr *) addr,
1326                                     addr_len);
1327        else
1328                result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1329                                           SCTP_SOCKOPT_BINDX_ADD,
1330                                           (char *)addr, addr_len);
1331
1332        if (result < 0)
1333                log_print("Can't bind to port %d addr number %d",
1334                          dlm_config.ci_tcp_port, num);
1335
1336        return result;
1337}
1338
1339/* Initialise SCTP socket and bind to all interfaces */
1340static int sctp_listen_for_all(void)
1341{
1342        struct socket *sock = NULL;
1343        struct sockaddr_storage localaddr;
1344        struct sctp_event_subscribe subscribe;
1345        int result = -EINVAL, num = 1, i, addr_len;
1346        struct connection *con = nodeid2con(0, GFP_NOFS);
1347        int bufsize = NEEDED_RMEM;
1348        int one = 1;
1349
1350        if (!con)
1351                return -ENOMEM;
1352
1353        log_print("Using SCTP for communications");
1354
1355        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1356                                  IPPROTO_SCTP, &sock);
1357        if (result < 0) {
1358                log_print("Can't create comms socket, check SCTP is loaded");
1359                goto out;
1360        }
1361
1362        /* Listen for events */
1363        memset(&subscribe, 0, sizeof(subscribe));
1364        subscribe.sctp_data_io_event = 1;
1365        subscribe.sctp_association_event = 1;
1366        subscribe.sctp_send_failure_event = 1;
1367        subscribe.sctp_shutdown_event = 1;
1368        subscribe.sctp_partial_delivery_event = 1;
1369
1370        result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1371                                 (char *)&bufsize, sizeof(bufsize));
1372        if (result)
1373                log_print("Error increasing buffer space on socket %d", result);
1374
1375        result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1376                                   (char *)&subscribe, sizeof(subscribe));
1377        if (result < 0) {
1378                log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1379                          result);
1380                goto create_delsock;
1381        }
1382
1383        result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1384                                   sizeof(one));
1385        if (result < 0)
1386                log_print("Could not set SCTP NODELAY error %d\n", result);
1387
1388        /* Init con struct */
1389        sock->sk->sk_user_data = con;
1390        con->sock = sock;
1391        con->sock->sk->sk_data_ready = lowcomms_data_ready;
1392        con->rx_action = receive_from_sock;
1393        con->connect_action = sctp_init_assoc;
1394
1395        /* Bind to all interfaces. */
1396        for (i = 0; i < dlm_local_count; i++) {
1397                memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1398                make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1399
1400                result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1401                if (result)
1402                        goto create_delsock;
1403                ++num;
1404        }
1405
1406        result = sock->ops->listen(sock, 5);
1407        if (result < 0) {
1408                log_print("Can't set socket listening");
1409                goto create_delsock;
1410        }
1411
1412        return 0;
1413
1414create_delsock:
1415        sock_release(sock);
1416        con->sock = NULL;
1417out:
1418        return result;
1419}
1420
1421static int tcp_listen_for_all(void)
1422{
1423        struct socket *sock = NULL;
1424        struct connection *con = nodeid2con(0, GFP_NOFS);
1425        int result = -EINVAL;
1426
1427        if (!con)
1428                return -ENOMEM;
1429
1430        /* We don't support multi-homed hosts */
1431        if (dlm_local_addr[1] != NULL) {
1432                log_print("TCP protocol can't handle multi-homed hosts, "
1433                          "try SCTP");
1434                return -EINVAL;
1435        }
1436
1437        log_print("Using TCP for communications");
1438
1439        sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1440        if (sock) {
1441                add_sock(sock, con);
1442                result = 0;
1443        }
1444        else {
1445                result = -EADDRINUSE;
1446        }
1447
1448        return result;
1449}
1450
1451
1452
1453static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1454                                                     gfp_t allocation)
1455{
1456        struct writequeue_entry *entry;
1457
1458        entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1459        if (!entry)
1460                return NULL;
1461
1462        entry->page = alloc_page(allocation);
1463        if (!entry->page) {
1464                kfree(entry);
1465                return NULL;
1466        }
1467
1468        entry->offset = 0;
1469        entry->len = 0;
1470        entry->end = 0;
1471        entry->users = 0;
1472        entry->con = con;
1473
1474        return entry;
1475}
1476
1477void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1478{
1479        struct connection *con;
1480        struct writequeue_entry *e;
1481        int offset = 0;
1482
1483        con = nodeid2con(nodeid, allocation);
1484        if (!con)
1485                return NULL;
1486
1487        spin_lock(&con->writequeue_lock);
1488        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1489        if ((&e->list == &con->writequeue) ||
1490            (PAGE_CACHE_SIZE - e->end < len)) {
1491                e = NULL;
1492        } else {
1493                offset = e->end;
1494                e->end += len;
1495                e->users++;
1496        }
1497        spin_unlock(&con->writequeue_lock);
1498
1499        if (e) {
1500        got_one:
1501                *ppc = page_address(e->page) + offset;
1502                return e;
1503        }
1504
1505        e = new_writequeue_entry(con, allocation);
1506        if (e) {
1507                spin_lock(&con->writequeue_lock);
1508                offset = e->end;
1509                e->end += len;
1510                e->users++;
1511                list_add_tail(&e->list, &con->writequeue);
1512                spin_unlock(&con->writequeue_lock);
1513                goto got_one;
1514        }
1515        return NULL;
1516}
1517
1518void dlm_lowcomms_commit_buffer(void *mh)
1519{
1520        struct writequeue_entry *e = (struct writequeue_entry *)mh;
1521        struct connection *con = e->con;
1522        int users;
1523
1524        spin_lock(&con->writequeue_lock);
1525        users = --e->users;
1526        if (users)
1527                goto out;
1528        e->len = e->end - e->offset;
1529        spin_unlock(&con->writequeue_lock);
1530
1531        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1532                queue_work(send_workqueue, &con->swork);
1533        }
1534        return;
1535
1536out:
1537        spin_unlock(&con->writequeue_lock);
1538        return;
1539}
1540
1541/* Send a message */
1542static void send_to_sock(struct connection *con)
1543{
1544        int ret = 0;
1545        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1546        struct writequeue_entry *e;
1547        int len, offset;
1548        int count = 0;
1549
1550        mutex_lock(&con->sock_mutex);
1551        if (con->sock == NULL)
1552                goto out_connect;
1553
1554        spin_lock(&con->writequeue_lock);
1555        for (;;) {
1556                e = list_entry(con->writequeue.next, struct writequeue_entry,
1557                               list);
1558                if ((struct list_head *) e == &con->writequeue)
1559                        break;
1560
1561                len = e->len;
1562                offset = e->offset;
1563                BUG_ON(len == 0 && e->users == 0);
1564                spin_unlock(&con->writequeue_lock);
1565
1566                ret = 0;
1567                if (len) {
1568                        ret = kernel_sendpage(con->sock, e->page, offset, len,
1569                                              msg_flags);
1570                        if (ret == -EAGAIN || ret == 0) {
1571                                if (ret == -EAGAIN &&
1572                                    test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1573                                    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1574                                        /* Notify TCP that we're limited by the
1575                                         * application window size.
1576                                         */
1577                                        set_bit(SOCK_NOSPACE, &con->sock->flags);
1578                                        con->sock->sk->sk_write_pending++;
1579                                }
1580                                cond_resched();
1581                                goto out;
1582                        } else if (ret < 0)
1583                                goto send_error;
1584                }
1585
1586                /* Don't starve people filling buffers */
1587                if (++count >= MAX_SEND_MSG_COUNT) {
1588                        cond_resched();
1589                        count = 0;
1590                }
1591
1592                spin_lock(&con->writequeue_lock);
1593                writequeue_entry_complete(e, ret);
1594        }
1595        spin_unlock(&con->writequeue_lock);
1596out:
1597        mutex_unlock(&con->sock_mutex);
1598        return;
1599
1600send_error:
1601        mutex_unlock(&con->sock_mutex);
1602        close_connection(con, false);
1603        lowcomms_connect_sock(con);
1604        return;
1605
1606out_connect:
1607        mutex_unlock(&con->sock_mutex);
1608        if (!test_bit(CF_INIT_PENDING, &con->flags))
1609                lowcomms_connect_sock(con);
1610}
1611
1612static void clean_one_writequeue(struct connection *con)
1613{
1614        struct writequeue_entry *e, *safe;
1615
1616        spin_lock(&con->writequeue_lock);
1617        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1618                list_del(&e->list);
1619                free_entry(e);
1620        }
1621        spin_unlock(&con->writequeue_lock);
1622}
1623
1624/* Called from recovery when it knows that a node has
1625   left the cluster */
1626int dlm_lowcomms_close(int nodeid)
1627{
1628        struct connection *con;
1629        struct dlm_node_addr *na;
1630
1631        log_print("closing connection to node %d", nodeid);
1632        con = nodeid2con(nodeid, 0);
1633        if (con) {
1634                clear_bit(CF_CONNECT_PENDING, &con->flags);
1635                clear_bit(CF_WRITE_PENDING, &con->flags);
1636                set_bit(CF_CLOSE, &con->flags);
1637                if (cancel_work_sync(&con->swork))
1638                        log_print("canceled swork for node %d", nodeid);
1639                if (cancel_work_sync(&con->rwork))
1640                        log_print("canceled rwork for node %d", nodeid);
1641                clean_one_writequeue(con);
1642                close_connection(con, true);
1643        }
1644
1645        spin_lock(&dlm_node_addrs_spin);
1646        na = find_node_addr(nodeid);
1647        if (na) {
1648                list_del(&na->list);
1649                while (na->addr_count--)
1650                        kfree(na->addr[na->addr_count]);
1651                kfree(na);
1652        }
1653        spin_unlock(&dlm_node_addrs_spin);
1654
1655        return 0;
1656}
1657
1658/* Receive workqueue function */
1659static void process_recv_sockets(struct work_struct *work)
1660{
1661        struct connection *con = container_of(work, struct connection, rwork);
1662        int err;
1663
1664        clear_bit(CF_READ_PENDING, &con->flags);
1665        do {
1666                err = con->rx_action(con);
1667        } while (!err);
1668}
1669
1670/* Send workqueue function */
1671static void process_send_sockets(struct work_struct *work)
1672{
1673        struct connection *con = container_of(work, struct connection, swork);
1674
1675        if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1676                con->connect_action(con);
1677                set_bit(CF_WRITE_PENDING, &con->flags);
1678        }
1679        if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1680                send_to_sock(con);
1681}
1682
1683
1684/* Discard all entries on the write queues */
1685static void clean_writequeues(void)
1686{
1687        foreach_conn(clean_one_writequeue);
1688}
1689
1690static void work_stop(void)
1691{
1692        destroy_workqueue(recv_workqueue);
1693        destroy_workqueue(send_workqueue);
1694}
1695
1696static int work_start(void)
1697{
1698        recv_workqueue = alloc_workqueue("dlm_recv",
1699                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1700        if (!recv_workqueue) {
1701                log_print("can't start dlm_recv");
1702                return -ENOMEM;
1703        }
1704
1705        send_workqueue = alloc_workqueue("dlm_send",
1706                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1707        if (!send_workqueue) {
1708                log_print("can't start dlm_send");
1709                destroy_workqueue(recv_workqueue);
1710                return -ENOMEM;
1711        }
1712
1713        return 0;
1714}
1715
1716static void stop_conn(struct connection *con)
1717{
1718        con->flags |= 0x0F;
1719        if (con->sock && con->sock->sk)
1720                con->sock->sk->sk_user_data = NULL;
1721}
1722
1723static void free_conn(struct connection *con)
1724{
1725        close_connection(con, true);
1726        if (con->othercon)
1727                kmem_cache_free(con_cache, con->othercon);
1728        hlist_del(&con->list);
1729        kmem_cache_free(con_cache, con);
1730}
1731
1732void dlm_lowcomms_stop(void)
1733{
1734        /* Set all the flags to prevent any
1735           socket activity.
1736        */
1737        mutex_lock(&connections_lock);
1738        dlm_allow_conn = 0;
1739        foreach_conn(stop_conn);
1740        mutex_unlock(&connections_lock);
1741
1742        work_stop();
1743
1744        mutex_lock(&connections_lock);
1745        clean_writequeues();
1746
1747        foreach_conn(free_conn);
1748
1749        mutex_unlock(&connections_lock);
1750        kmem_cache_destroy(con_cache);
1751}
1752
1753int dlm_lowcomms_start(void)
1754{
1755        int error = -EINVAL;
1756        struct connection *con;
1757        int i;
1758
1759        for (i = 0; i < CONN_HASH_SIZE; i++)
1760                INIT_HLIST_HEAD(&connection_hash[i]);
1761
1762        init_local();
1763        if (!dlm_local_count) {
1764                error = -ENOTCONN;
1765                log_print("no local IP address has been set");
1766                goto fail;
1767        }
1768
1769        error = -ENOMEM;
1770        con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1771                                      __alignof__(struct connection), 0,
1772                                      NULL);
1773        if (!con_cache)
1774                goto fail;
1775
1776        error = work_start();
1777        if (error)
1778                goto fail_destroy;
1779
1780        dlm_allow_conn = 1;
1781
1782        /* Start listening */
1783        if (dlm_config.ci_protocol == 0)
1784                error = tcp_listen_for_all();
1785        else
1786                error = sctp_listen_for_all();
1787        if (error)
1788                goto fail_unlisten;
1789
1790        return 0;
1791
1792fail_unlisten:
1793        dlm_allow_conn = 0;
1794        con = nodeid2con(0,0);
1795        if (con) {
1796                close_connection(con, false);
1797                kmem_cache_free(con_cache, con);
1798        }
1799fail_destroy:
1800        kmem_cache_destroy(con_cache);
1801fail:
1802        return error;
1803}
1804
1805void dlm_lowcomms_exit(void)
1806{
1807        struct dlm_node_addr *na, *safe;
1808
1809        spin_lock(&dlm_node_addrs_spin);
1810        list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1811                list_del(&na->list);
1812                while (na->addr_count--)
1813                        kfree(na->addr[na->addr_count]);
1814                kfree(na);
1815        }
1816        spin_unlock(&dlm_node_addrs_spin);
1817}
1818
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.