linux/fs/dlm/lowcomms.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14/*
  15 * lowcomms.c
  16 *
  17 * This is the "low-level" comms layer.
  18 *
  19 * It is responsible for sending/receiving messages
  20 * from other nodes in the cluster.
  21 *
  22 * Cluster nodes are referred to by their nodeids. nodeids are
  23 * simply 32 bit numbers to the locking module - if they need to
  24 * be expanded for the cluster infrastructure then that is its
  25 * responsibility. It is this layer's
  26 * responsibility to resolve these into IP address or
  27 * whatever it needs for inter-node communication.
  28 *
  29 * The comms level is two kernel threads that deal mainly with
  30 * the receiving of messages from other nodes and passing them
  31 * up to the mid-level comms layer (which understands the
  32 * message format) for execution by the locking core, and
  33 * a send thread which does all the setting up of connections
  34 * to remote nodes and the sending of data. Threads are not allowed
  35 * to send their own data because it may cause them to wait in times
  36 * of high load. Also, this way, the sending thread can collect together
  37 * messages bound for one node and send them in one block.
  38 *
  39 * lowcomms will choose to use either TCP or SCTP as its transport layer
  40 * depending on the configuration variable 'protocol'. This should be set
  41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
  43 * for the DLM to function.
  44 *
  45 */
  46
  47#include <asm/ioctls.h>
  48#include <net/sock.h>
  49#include <net/tcp.h>
  50#include <linux/pagemap.h>
  51#include <linux/file.h>
  52#include <linux/mutex.h>
  53#include <linux/sctp.h>
  54#include <linux/slab.h>
  55#include <net/sctp/sctp.h>
  56#include <net/sctp/user.h>
  57#include <net/ipv6.h>
  58
  59#include "dlm_internal.h"
  60#include "lowcomms.h"
  61#include "midcomms.h"
  62#include "config.h"
  63
  64#define NEEDED_RMEM (4*1024*1024)
  65#define CONN_HASH_SIZE 32
  66
  67/* Number of messages to send before rescheduling */
  68#define MAX_SEND_MSG_COUNT 25
  69
  70struct cbuf {
  71        unsigned int base;
  72        unsigned int len;
  73        unsigned int mask;
  74};
  75
  76static void cbuf_add(struct cbuf *cb, int n)
  77{
  78        cb->len += n;
  79}
  80
  81static int cbuf_data(struct cbuf *cb)
  82{
  83        return ((cb->base + cb->len) & cb->mask);
  84}
  85
  86static void cbuf_init(struct cbuf *cb, int size)
  87{
  88        cb->base = cb->len = 0;
  89        cb->mask = size-1;
  90}
  91
  92static void cbuf_eat(struct cbuf *cb, int n)
  93{
  94        cb->len  -= n;
  95        cb->base += n;
  96        cb->base &= cb->mask;
  97}
  98
  99static bool cbuf_empty(struct cbuf *cb)
 100{
 101        return cb->len == 0;
 102}
 103
 104struct connection {
 105        struct socket *sock;    /* NULL if not connected */
 106        uint32_t nodeid;        /* So we know who we are in the list */
 107        struct mutex sock_mutex;
 108        unsigned long flags;
 109#define CF_READ_PENDING 1
 110#define CF_WRITE_PENDING 2
 111#define CF_CONNECT_PENDING 3
 112#define CF_INIT_PENDING 4
 113#define CF_IS_OTHERCON 5
 114#define CF_CLOSE 6
 115#define CF_APP_LIMITED 7
 116        struct list_head writequeue;  /* List of outgoing writequeue_entries */
 117        spinlock_t writequeue_lock;
 118        int (*rx_action) (struct connection *); /* What to do when active */
 119        void (*connect_action) (struct connection *);   /* What to do to connect */
 120        struct page *rx_page;
 121        struct cbuf cb;
 122        int retries;
 123#define MAX_CONNECT_RETRIES 3
 124        int sctp_assoc;
 125        struct hlist_node list;
 126        struct connection *othercon;
 127        struct work_struct rwork; /* Receive workqueue */
 128        struct work_struct swork; /* Send workqueue */
 129};
 130#define sock2con(x) ((struct connection *)(x)->sk_user_data)
 131
 132/* An entry waiting to be sent */
 133struct writequeue_entry {
 134        struct list_head list;
 135        struct page *page;
 136        int offset;
 137        int len;
 138        int end;
 139        int users;
 140        struct connection *con;
 141};
 142
 143struct dlm_node_addr {
 144        struct list_head list;
 145        int nodeid;
 146        int addr_count;
 147        struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
 148};
 149
 150static LIST_HEAD(dlm_node_addrs);
 151static DEFINE_SPINLOCK(dlm_node_addrs_spin);
 152
 153static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 154static int dlm_local_count;
 155static int dlm_allow_conn;
 156
 157/* Work queues */
 158static struct workqueue_struct *recv_workqueue;
 159static struct workqueue_struct *send_workqueue;
 160
 161static struct hlist_head connection_hash[CONN_HASH_SIZE];
 162static DEFINE_MUTEX(connections_lock);
 163static struct kmem_cache *con_cache;
 164
 165static void process_recv_sockets(struct work_struct *work);
 166static void process_send_sockets(struct work_struct *work);
 167
 168
 169/* This is deliberately very simple because most clusters have simple
 170   sequential nodeids, so we should be able to go straight to a connection
 171   struct in the array */
 172static inline int nodeid_hash(int nodeid)
 173{
 174        return nodeid & (CONN_HASH_SIZE-1);
 175}
 176
 177static struct connection *__find_con(int nodeid)
 178{
 179        int r;
 180        struct hlist_node *h;
 181        struct connection *con;
 182
 183        r = nodeid_hash(nodeid);
 184
 185        hlist_for_each_entry(con, h, &connection_hash[r], list) {
 186                if (con->nodeid == nodeid)
 187                        return con;
 188        }
 189        return NULL;
 190}
 191
 192/*
 193 * If 'allocation' is zero then we don't attempt to create a new
 194 * connection structure for this node.
 195 */
 196static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
 197{
 198        struct connection *con = NULL;
 199        int r;
 200
 201        con = __find_con(nodeid);
 202        if (con || !alloc)
 203                return con;
 204
 205        con = kmem_cache_zalloc(con_cache, alloc);
 206        if (!con)
 207                return NULL;
 208
 209        r = nodeid_hash(nodeid);
 210        hlist_add_head(&con->list, &connection_hash[r]);
 211
 212        con->nodeid = nodeid;
 213        mutex_init(&con->sock_mutex);
 214        INIT_LIST_HEAD(&con->writequeue);
 215        spin_lock_init(&con->writequeue_lock);
 216        INIT_WORK(&con->swork, process_send_sockets);
 217        INIT_WORK(&con->rwork, process_recv_sockets);
 218
 219        /* Setup action pointers for child sockets */
 220        if (con->nodeid) {
 221                struct connection *zerocon = __find_con(0);
 222
 223                con->connect_action = zerocon->connect_action;
 224                if (!con->rx_action)
 225                        con->rx_action = zerocon->rx_action;
 226        }
 227
 228        return con;
 229}
 230
 231/* Loop round all connections */
 232static void foreach_conn(void (*conn_func)(struct connection *c))
 233{
 234        int i;
 235        struct hlist_node *h, *n;
 236        struct connection *con;
 237
 238        for (i = 0; i < CONN_HASH_SIZE; i++) {
 239                hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){
 240                        conn_func(con);
 241                }
 242        }
 243}
 244
 245static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 246{
 247        struct connection *con;
 248
 249        mutex_lock(&connections_lock);
 250        con = __nodeid2con(nodeid, allocation);
 251        mutex_unlock(&connections_lock);
 252
 253        return con;
 254}
 255
 256/* This is a bit drastic, but only called when things go wrong */
 257static struct connection *assoc2con(int assoc_id)
 258{
 259        int i;
 260        struct hlist_node *h;
 261        struct connection *con;
 262
 263        mutex_lock(&connections_lock);
 264
 265        for (i = 0 ; i < CONN_HASH_SIZE; i++) {
 266                hlist_for_each_entry(con, h, &connection_hash[i], list) {
 267                        if (con->sctp_assoc == assoc_id) {
 268                                mutex_unlock(&connections_lock);
 269                                return con;
 270                        }
 271                }
 272        }
 273        mutex_unlock(&connections_lock);
 274        return NULL;
 275}
 276
 277static struct dlm_node_addr *find_node_addr(int nodeid)
 278{
 279        struct dlm_node_addr *na;
 280
 281        list_for_each_entry(na, &dlm_node_addrs, list) {
 282                if (na->nodeid == nodeid)
 283                        return na;
 284        }
 285        return NULL;
 286}
 287
 288static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
 289{
 290        switch (x->ss_family) {
 291        case AF_INET: {
 292                struct sockaddr_in *sinx = (struct sockaddr_in *)x;
 293                struct sockaddr_in *siny = (struct sockaddr_in *)y;
 294                if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
 295                        return 0;
 296                if (sinx->sin_port != siny->sin_port)
 297                        return 0;
 298                break;
 299        }
 300        case AF_INET6: {
 301                struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
 302                struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
 303                if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
 304                        return 0;
 305                if (sinx->sin6_port != siny->sin6_port)
 306                        return 0;
 307                break;
 308        }
 309        default:
 310                return 0;
 311        }
 312        return 1;
 313}
 314
 315static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
 316                          struct sockaddr *sa_out)
 317{
 318        struct sockaddr_storage sas;
 319        struct dlm_node_addr *na;
 320
 321        if (!dlm_local_count)
 322                return -1;
 323
 324        spin_lock(&dlm_node_addrs_spin);
 325        na = find_node_addr(nodeid);
 326        if (na && na->addr_count)
 327                memcpy(&sas, na->addr[0], sizeof(struct sockaddr_storage));
 328        spin_unlock(&dlm_node_addrs_spin);
 329
 330        if (!na)
 331                return -EEXIST;
 332
 333        if (!na->addr_count)
 334                return -ENOENT;
 335
 336        if (sas_out)
 337                memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
 338
 339        if (!sa_out)
 340                return 0;
 341
 342        if (dlm_local_addr[0]->ss_family == AF_INET) {
 343                struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
 344                struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
 345                ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 346        } else {
 347                struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
 348                struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
 349                ret6->sin6_addr = in6->sin6_addr;
 350        }
 351
 352        return 0;
 353}
 354
 355static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
 356{
 357        struct dlm_node_addr *na;
 358        int rv = -EEXIST;
 359
 360        spin_lock(&dlm_node_addrs_spin);
 361        list_for_each_entry(na, &dlm_node_addrs, list) {
 362                if (!na->addr_count)
 363                        continue;
 364
 365                if (!addr_compare(na->addr[0], addr))
 366                        continue;
 367
 368                *nodeid = na->nodeid;
 369                rv = 0;
 370                break;
 371        }
 372        spin_unlock(&dlm_node_addrs_spin);
 373        return rv;
 374}
 375
 376int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 377{
 378        struct sockaddr_storage *new_addr;
 379        struct dlm_node_addr *new_node, *na;
 380
 381        new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
 382        if (!new_node)
 383                return -ENOMEM;
 384
 385        new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
 386        if (!new_addr) {
 387                kfree(new_node);
 388                return -ENOMEM;
 389        }
 390
 391        memcpy(new_addr, addr, len);
 392
 393        spin_lock(&dlm_node_addrs_spin);
 394        na = find_node_addr(nodeid);
 395        if (!na) {
 396                new_node->nodeid = nodeid;
 397                new_node->addr[0] = new_addr;
 398                new_node->addr_count = 1;
 399                list_add(&new_node->list, &dlm_node_addrs);
 400                spin_unlock(&dlm_node_addrs_spin);
 401                return 0;
 402        }
 403
 404        if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
 405                spin_unlock(&dlm_node_addrs_spin);
 406                kfree(new_addr);
 407                kfree(new_node);
 408                return -ENOSPC;
 409        }
 410
 411        na->addr[na->addr_count++] = new_addr;
 412        spin_unlock(&dlm_node_addrs_spin);
 413        kfree(new_node);
 414        return 0;
 415}
 416
 417/* Data available on socket or listen socket received a connect */
 418static void lowcomms_data_ready(struct sock *sk, int count_unused)
 419{
 420        struct connection *con = sock2con(sk);
 421        if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
 422                queue_work(recv_workqueue, &con->rwork);
 423}
 424
 425static void lowcomms_write_space(struct sock *sk)
 426{
 427        struct connection *con = sock2con(sk);
 428
 429        if (!con)
 430                return;
 431
 432        clear_bit(SOCK_NOSPACE, &con->sock->flags);
 433
 434        if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
 435                con->sock->sk->sk_write_pending--;
 436                clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
 437        }
 438
 439        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 440                queue_work(send_workqueue, &con->swork);
 441}
 442
 443static inline void lowcomms_connect_sock(struct connection *con)
 444{
 445        if (test_bit(CF_CLOSE, &con->flags))
 446                return;
 447        if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
 448                queue_work(send_workqueue, &con->swork);
 449}
 450
 451static void lowcomms_state_change(struct sock *sk)
 452{
 453        if (sk->sk_state == TCP_ESTABLISHED)
 454                lowcomms_write_space(sk);
 455}
 456
 457int dlm_lowcomms_connect_node(int nodeid)
 458{
 459        struct connection *con;
 460
 461        /* with sctp there's no connecting without sending */
 462        if (dlm_config.ci_protocol != 0)
 463                return 0;
 464
 465        if (nodeid == dlm_our_nodeid())
 466                return 0;
 467
 468        con = nodeid2con(nodeid, GFP_NOFS);
 469        if (!con)
 470                return -ENOMEM;
 471        lowcomms_connect_sock(con);
 472        return 0;
 473}
 474
 475/* Make a socket active */
 476static void add_sock(struct socket *sock, struct connection *con)
 477{
 478        con->sock = sock;
 479
 480        /* Install a data_ready callback */
 481        con->sock->sk->sk_data_ready = lowcomms_data_ready;
 482        con->sock->sk->sk_write_space = lowcomms_write_space;
 483        con->sock->sk->sk_state_change = lowcomms_state_change;
 484        con->sock->sk->sk_user_data = con;
 485        con->sock->sk->sk_allocation = GFP_NOFS;
 486}
 487
 488/* Add the port number to an IPv6 or 4 sockaddr and return the address
 489   length */
 490static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 491                          int *addr_len)
 492{
 493        saddr->ss_family =  dlm_local_addr[0]->ss_family;
 494        if (saddr->ss_family == AF_INET) {
 495                struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 496                in4_addr->sin_port = cpu_to_be16(port);
 497                *addr_len = sizeof(struct sockaddr_in);
 498                memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
 499        } else {
 500                struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
 501                in6_addr->sin6_port = cpu_to_be16(port);
 502                *addr_len = sizeof(struct sockaddr_in6);
 503        }
 504        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 505}
 506
 507/* Close a remote connection and tidy up */
 508static void close_connection(struct connection *con, bool and_other)
 509{
 510        mutex_lock(&con->sock_mutex);
 511
 512        if (con->sock) {
 513                sock_release(con->sock);
 514                con->sock = NULL;
 515        }
 516        if (con->othercon && and_other) {
 517                /* Will only re-enter once. */
 518                close_connection(con->othercon, false);
 519        }
 520        if (con->rx_page) {
 521                __free_page(con->rx_page);
 522                con->rx_page = NULL;
 523        }
 524
 525        con->retries = 0;
 526        mutex_unlock(&con->sock_mutex);
 527}
 528
 529/* We only send shutdown messages to nodes that are not part of the cluster */
 530static void sctp_send_shutdown(sctp_assoc_t associd)
 531{
 532        static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 533        struct msghdr outmessage;
 534        struct cmsghdr *cmsg;
 535        struct sctp_sndrcvinfo *sinfo;
 536        int ret;
 537        struct connection *con;
 538
 539        con = nodeid2con(0,0);
 540        BUG_ON(con == NULL);
 541
 542        outmessage.msg_name = NULL;
 543        outmessage.msg_namelen = 0;
 544        outmessage.msg_control = outcmsg;
 545        outmessage.msg_controllen = sizeof(outcmsg);
 546        outmessage.msg_flags = MSG_EOR;
 547
 548        cmsg = CMSG_FIRSTHDR(&outmessage);
 549        cmsg->cmsg_level = IPPROTO_SCTP;
 550        cmsg->cmsg_type = SCTP_SNDRCV;
 551        cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
 552        outmessage.msg_controllen = cmsg->cmsg_len;
 553        sinfo = CMSG_DATA(cmsg);
 554        memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
 555
 556        sinfo->sinfo_flags |= MSG_EOF;
 557        sinfo->sinfo_assoc_id = associd;
 558
 559        ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
 560
 561        if (ret != 0)
 562                log_print("send EOF to node failed: %d", ret);
 563}
 564
 565static void sctp_init_failed_foreach(struct connection *con)
 566{
 567        con->sctp_assoc = 0;
 568        if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
 569                if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 570                        queue_work(send_workqueue, &con->swork);
 571        }
 572}
 573
 574/* INIT failed but we don't know which node...
 575   restart INIT on all pending nodes */
 576static void sctp_init_failed(void)
 577{
 578        mutex_lock(&connections_lock);
 579
 580        foreach_conn(sctp_init_failed_foreach);
 581
 582        mutex_unlock(&connections_lock);
 583}
 584
 585/* Something happened to an association */
 586static void process_sctp_notification(struct connection *con,
 587                                      struct msghdr *msg, char *buf)
 588{
 589        union sctp_notification *sn = (union sctp_notification *)buf;
 590
 591        if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
 592                switch (sn->sn_assoc_change.sac_state) {
 593
 594                case SCTP_COMM_UP:
 595                case SCTP_RESTART:
 596                {
 597                        /* Check that the new node is in the lockspace */
 598                        struct sctp_prim prim;
 599                        int nodeid;
 600                        int prim_len, ret;
 601                        int addr_len;
 602                        struct connection *new_con;
 603
 604                        /*
 605                         * We get this before any data for an association.
 606                         * We verify that the node is in the cluster and
 607                         * then peel off a socket for it.
 608                         */
 609                        if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
 610                                log_print("COMM_UP for invalid assoc ID %d",
 611                                         (int)sn->sn_assoc_change.sac_assoc_id);
 612                                sctp_init_failed();
 613                                return;
 614                        }
 615                        memset(&prim, 0, sizeof(struct sctp_prim));
 616                        prim_len = sizeof(struct sctp_prim);
 617                        prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
 618
 619                        ret = kernel_getsockopt(con->sock,
 620                                                IPPROTO_SCTP,
 621                                                SCTP_PRIMARY_ADDR,
 622                                                (char*)&prim,
 623                                                &prim_len);
 624                        if (ret < 0) {
 625                                log_print("getsockopt/sctp_primary_addr on "
 626                                          "new assoc %d failed : %d",
 627                                          (int)sn->sn_assoc_change.sac_assoc_id,
 628                                          ret);
 629
 630                                /* Retry INIT later */
 631                                new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
 632                                if (new_con)
 633                                        clear_bit(CF_CONNECT_PENDING, &con->flags);
 634                                return;
 635                        }
 636                        make_sockaddr(&prim.ssp_addr, 0, &addr_len);
 637                        if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
 638                                unsigned char *b=(unsigned char *)&prim.ssp_addr;
 639                                log_print("reject connect from unknown addr");
 640                                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 641                                                     b, sizeof(struct sockaddr_storage));
 642                                sctp_send_shutdown(prim.ssp_assoc_id);
 643                                return;
 644                        }
 645
 646                        new_con = nodeid2con(nodeid, GFP_NOFS);
 647                        if (!new_con)
 648                                return;
 649
 650                        /* Peel off a new sock */
 651                        sctp_lock_sock(con->sock->sk);
 652                        ret = sctp_do_peeloff(con->sock->sk,
 653                                sn->sn_assoc_change.sac_assoc_id,
 654                                &new_con->sock);
 655                        sctp_release_sock(con->sock->sk);
 656                        if (ret < 0) {
 657                                log_print("Can't peel off a socket for "
 658                                          "connection %d to node %d: err=%d",
 659                                          (int)sn->sn_assoc_change.sac_assoc_id,
 660                                          nodeid, ret);
 661                                return;
 662                        }
 663                        add_sock(new_con->sock, new_con);
 664
 665                        log_print("connecting to %d sctp association %d",
 666                                 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
 667
 668                        /* Send any pending writes */
 669                        clear_bit(CF_CONNECT_PENDING, &new_con->flags);
 670                        clear_bit(CF_INIT_PENDING, &con->flags);
 671                        if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
 672                                queue_work(send_workqueue, &new_con->swork);
 673                        }
 674                        if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
 675                                queue_work(recv_workqueue, &new_con->rwork);
 676                }
 677                break;
 678
 679                case SCTP_COMM_LOST:
 680                case SCTP_SHUTDOWN_COMP:
 681                {
 682                        con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
 683                        if (con) {
 684                                con->sctp_assoc = 0;
 685                        }
 686                }
 687                break;
 688
 689                /* We don't know which INIT failed, so clear the PENDING flags
 690                 * on them all.  if assoc_id is zero then it will then try
 691                 * again */
 692
 693                case SCTP_CANT_STR_ASSOC:
 694                {
 695                        log_print("Can't start SCTP association - retrying");
 696                        sctp_init_failed();
 697                }
 698                break;
 699
 700                default:
 701                        log_print("unexpected SCTP assoc change id=%d state=%d",
 702                                  (int)sn->sn_assoc_change.sac_assoc_id,
 703                                  sn->sn_assoc_change.sac_state);
 704                }
 705        }
 706}
 707
 708/* Data received from remote end */
 709static int receive_from_sock(struct connection *con)
 710{
 711        int ret = 0;
 712        struct msghdr msg = {};
 713        struct kvec iov[2];
 714        unsigned len;
 715        int r;
 716        int call_again_soon = 0;
 717        int nvec;
 718        char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 719
 720        mutex_lock(&con->sock_mutex);
 721
 722        if (con->sock == NULL) {
 723                ret = -EAGAIN;
 724                goto out_close;
 725        }
 726
 727        if (con->rx_page == NULL) {
 728                /*
 729                 * This doesn't need to be atomic, but I think it should
 730                 * improve performance if it is.
 731                 */
 732                con->rx_page = alloc_page(GFP_ATOMIC);
 733                if (con->rx_page == NULL)
 734                        goto out_resched;
 735                cbuf_init(&con->cb, PAGE_CACHE_SIZE);
 736        }
 737
 738        /* Only SCTP needs these really */
 739        memset(&incmsg, 0, sizeof(incmsg));
 740        msg.msg_control = incmsg;
 741        msg.msg_controllen = sizeof(incmsg);
 742
 743        /*
 744         * iov[0] is the bit of the circular buffer between the current end
 745         * point (cb.base + cb.len) and the end of the buffer.
 746         */
 747        iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
 748        iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
 749        iov[1].iov_len = 0;
 750        nvec = 1;
 751
 752        /*
 753         * iov[1] is the bit of the circular buffer between the start of the
 754         * buffer and the start of the currently used section (cb.base)
 755         */
 756        if (cbuf_data(&con->cb) >= con->cb.base) {
 757                iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
 758                iov[1].iov_len = con->cb.base;
 759                iov[1].iov_base = page_address(con->rx_page);
 760                nvec = 2;
 761        }
 762        len = iov[0].iov_len + iov[1].iov_len;
 763
 764        r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
 765                               MSG_DONTWAIT | MSG_NOSIGNAL);
 766        if (ret <= 0)
 767                goto out_close;
 768
 769        /* Process SCTP notifications */
 770        if (msg.msg_flags & MSG_NOTIFICATION) {
 771                msg.msg_control = incmsg;
 772                msg.msg_controllen = sizeof(incmsg);
 773
 774                process_sctp_notification(con, &msg,
 775                                page_address(con->rx_page) + con->cb.base);
 776                mutex_unlock(&con->sock_mutex);
 777                return 0;
 778        }
 779        BUG_ON(con->nodeid == 0);
 780
 781        if (ret == len)
 782                call_again_soon = 1;
 783        cbuf_add(&con->cb, ret);
 784        ret = dlm_process_incoming_buffer(con->nodeid,
 785                                          page_address(con->rx_page),
 786                                          con->cb.base, con->cb.len,
 787                                          PAGE_CACHE_SIZE);
 788        if (ret == -EBADMSG) {
 789                log_print("lowcomms: addr=%p, base=%u, len=%u, "
 790                          "iov_len=%u, iov_base[0]=%p, read=%d",
 791                          page_address(con->rx_page), con->cb.base, con->cb.len,
 792                          len, iov[0].iov_base, r);
 793        }
 794        if (ret < 0)
 795                goto out_close;
 796        cbuf_eat(&con->cb, ret);
 797
 798        if (cbuf_empty(&con->cb) && !call_again_soon) {
 799                __free_page(con->rx_page);
 800                con->rx_page = NULL;
 801        }
 802
 803        if (call_again_soon)
 804                goto out_resched;
 805        mutex_unlock(&con->sock_mutex);
 806        return 0;
 807
 808out_resched:
 809        if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
 810                queue_work(recv_workqueue, &con->rwork);
 811        mutex_unlock(&con->sock_mutex);
 812        return -EAGAIN;
 813
 814out_close:
 815        mutex_unlock(&con->sock_mutex);
 816        if (ret != -EAGAIN) {
 817                close_connection(con, false);
 818                /* Reconnect when there is something to send */
 819        }
 820        /* Don't return success if we really got EOF */
 821        if (ret == 0)
 822                ret = -EAGAIN;
 823
 824        return ret;
 825}
 826
 827/* Listening socket is busy, accept a connection */
 828static int tcp_accept_from_sock(struct connection *con)
 829{
 830        int result;
 831        struct sockaddr_storage peeraddr;
 832        struct socket *newsock;
 833        int len;
 834        int nodeid;
 835        struct connection *newcon;
 836        struct connection *addcon;
 837
 838        mutex_lock(&connections_lock);
 839        if (!dlm_allow_conn) {
 840                mutex_unlock(&connections_lock);
 841                return -1;
 842        }
 843        mutex_unlock(&connections_lock);
 844
 845        memset(&peeraddr, 0, sizeof(peeraddr));
 846        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
 847                                  IPPROTO_TCP, &newsock);
 848        if (result < 0)
 849                return -ENOMEM;
 850
 851        mutex_lock_nested(&con->sock_mutex, 0);
 852
 853        result = -ENOTCONN;
 854        if (con->sock == NULL)
 855                goto accept_err;
 856
 857        newsock->type = con->sock->type;
 858        newsock->ops = con->sock->ops;
 859
 860        result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
 861        if (result < 0)
 862                goto accept_err;
 863
 864        /* Get the connected socket's peer */
 865        memset(&peeraddr, 0, sizeof(peeraddr));
 866        if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
 867                                  &len, 2)) {
 868                result = -ECONNABORTED;
 869                goto accept_err;
 870        }
 871
 872        /* Get the new node's NODEID */
 873        make_sockaddr(&peeraddr, 0, &len);
 874        if (addr_to_nodeid(&peeraddr, &nodeid)) {
 875                unsigned char *b=(unsigned char *)&peeraddr;
 876                log_print("connect from non cluster node");
 877                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 878                                     b, sizeof(struct sockaddr_storage));
 879                sock_release(newsock);
 880                mutex_unlock(&con->sock_mutex);
 881                return -1;
 882        }
 883
 884        log_print("got connection from %d", nodeid);
 885
 886        /*  Check to see if we already have a connection to this node. This
 887         *  could happen if the two nodes initiate a connection at roughly
 888         *  the same time and the connections cross on the wire.
 889         *  In this case we store the incoming one in "othercon"
 890         */
 891        newcon = nodeid2con(nodeid, GFP_NOFS);
 892        if (!newcon) {
 893                result = -ENOMEM;
 894                goto accept_err;
 895        }
 896        mutex_lock_nested(&newcon->sock_mutex, 1);
 897        if (newcon->sock) {
 898                struct connection *othercon = newcon->othercon;
 899
 900                if (!othercon) {
 901                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 902                        if (!othercon) {
 903                                log_print("failed to allocate incoming socket");
 904                                mutex_unlock(&newcon->sock_mutex);
 905                                result = -ENOMEM;
 906                                goto accept_err;
 907                        }
 908                        othercon->nodeid = nodeid;
 909                        othercon->rx_action = receive_from_sock;
 910                        mutex_init(&othercon->sock_mutex);
 911                        INIT_WORK(&othercon->swork, process_send_sockets);
 912                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 913                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 914                }
 915                if (!othercon->sock) {
 916                        newcon->othercon = othercon;
 917                        othercon->sock = newsock;
 918                        newsock->sk->sk_user_data = othercon;
 919                        add_sock(newsock, othercon);
 920                        addcon = othercon;
 921                }
 922                else {
 923                        printk("Extra connection from node %d attempted\n", nodeid);
 924                        result = -EAGAIN;
 925                        mutex_unlock(&newcon->sock_mutex);
 926                        goto accept_err;
 927                }
 928        }
 929        else {
 930                newsock->sk->sk_user_data = newcon;
 931                newcon->rx_action = receive_from_sock;
 932                add_sock(newsock, newcon);
 933                addcon = newcon;
 934        }
 935
 936        mutex_unlock(&newcon->sock_mutex);
 937
 938        /*
 939         * Add it to the active queue in case we got data
 940         * between processing the accept adding the socket
 941         * to the read_sockets list
 942         */
 943        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
 944                queue_work(recv_workqueue, &addcon->rwork);
 945        mutex_unlock(&con->sock_mutex);
 946
 947        return 0;
 948
 949accept_err:
 950        mutex_unlock(&con->sock_mutex);
 951        sock_release(newsock);
 952
 953        if (result != -EAGAIN)
 954                log_print("error accepting connection from node: %d", result);
 955        return result;
 956}
 957
 958static void free_entry(struct writequeue_entry *e)
 959{
 960        __free_page(e->page);
 961        kfree(e);
 962}
 963
 964/* Initiate an SCTP association.
 965   This is a special case of send_to_sock() in that we don't yet have a
 966   peeled-off socket for this association, so we use the listening socket
 967   and add the primary IP address of the remote node.
 968 */
 969static void sctp_init_assoc(struct connection *con)
 970{
 971        struct sockaddr_storage rem_addr;
 972        char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 973        struct msghdr outmessage;
 974        struct cmsghdr *cmsg;
 975        struct sctp_sndrcvinfo *sinfo;
 976        struct connection *base_con;
 977        struct writequeue_entry *e;
 978        int len, offset;
 979        int ret;
 980        int addrlen;
 981        struct kvec iov[1];
 982
 983        if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
 984                return;
 985
 986        if (con->retries++ > MAX_CONNECT_RETRIES)
 987                return;
 988
 989        if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr)) {
 990                log_print("no address for nodeid %d", con->nodeid);
 991                return;
 992        }
 993        base_con = nodeid2con(0, 0);
 994        BUG_ON(base_con == NULL);
 995
 996        make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
 997
 998        outmessage.msg_name = &rem_addr;
 999        outmessage.msg_namelen = addrlen;
1000        outmessage.msg_control = outcmsg;
1001        outmessage.msg_controllen = sizeof(outcmsg);
1002        outmessage.msg_flags = MSG_EOR;
1003
1004        spin_lock(&con->writequeue_lock);
1005
1006        if (list_empty(&con->writequeue)) {
1007                spin_unlock(&con->writequeue_lock);
1008                log_print("writequeue empty for nodeid %d", con->nodeid);
1009                return;
1010        }
1011
1012        e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
1013        len = e->len;
1014        offset = e->offset;
1015        spin_unlock(&con->writequeue_lock);
1016
1017        /* Send the first block off the write queue */
1018        iov[0].iov_base = page_address(e->page)+offset;
1019        iov[0].iov_len = len;
1020
1021        cmsg = CMSG_FIRSTHDR(&outmessage);
1022        cmsg->cmsg_level = IPPROTO_SCTP;
1023        cmsg->cmsg_type = SCTP_SNDRCV;
1024        cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1025        sinfo = CMSG_DATA(cmsg);
1026        memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1027        sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
1028        outmessage.msg_controllen = cmsg->cmsg_len;
1029
1030        ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
1031        if (ret < 0) {
1032                log_print("Send first packet to node %d failed: %d",
1033                          con->nodeid, ret);
1034
1035                /* Try again later */
1036                clear_bit(CF_CONNECT_PENDING, &con->flags);
1037                clear_bit(CF_INIT_PENDING, &con->flags);
1038        }
1039        else {
1040                spin_lock(&con->writequeue_lock);
1041                e->offset += ret;
1042                e->len -= ret;
1043
1044                if (e->len == 0 && e->users == 0) {
1045                        list_del(&e->list);
1046                        free_entry(e);
1047                }
1048                spin_unlock(&con->writequeue_lock);
1049        }
1050}
1051
1052/* Connect a new socket to its peer */
1053static void tcp_connect_to_sock(struct connection *con)
1054{
1055        struct sockaddr_storage saddr, src_addr;
1056        int addr_len;
1057        struct socket *sock = NULL;
1058        int one = 1;
1059        int result;
1060
1061        if (con->nodeid == 0) {
1062                log_print("attempt to connect sock 0 foiled");
1063                return;
1064        }
1065
1066        mutex_lock(&con->sock_mutex);
1067        if (con->retries++ > MAX_CONNECT_RETRIES)
1068                goto out;
1069
1070        /* Some odd races can cause double-connects, ignore them */
1071        if (con->sock)
1072                goto out;
1073
1074        /* Create a socket to communicate with */
1075        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1076                                  IPPROTO_TCP, &sock);
1077        if (result < 0)
1078                goto out_err;
1079
1080        memset(&saddr, 0, sizeof(saddr));
1081        result = nodeid_to_addr(con->nodeid, &saddr, NULL);
1082        if (result < 0) {
1083                log_print("no address for nodeid %d", con->nodeid);
1084                goto out_err;
1085        }
1086
1087        sock->sk->sk_user_data = con;
1088        con->rx_action = receive_from_sock;
1089        con->connect_action = tcp_connect_to_sock;
1090        add_sock(sock, con);
1091
1092        /* Bind to our cluster-known address connecting to avoid
1093           routing problems */
1094        memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1095        make_sockaddr(&src_addr, 0, &addr_len);
1096        result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1097                                 addr_len);
1098        if (result < 0) {
1099                log_print("could not bind for connect: %d", result);
1100                /* This *may* not indicate a critical error */
1101        }
1102
1103        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1104
1105        log_print("connecting to %d", con->nodeid);
1106
1107        /* Turn off Nagle's algorithm */
1108        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1109                          sizeof(one));
1110
1111        result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1112                                   O_NONBLOCK);
1113        if (result == -EINPROGRESS)
1114                result = 0;
1115        if (result == 0)
1116                goto out;
1117
1118out_err:
1119        if (con->sock) {
1120                sock_release(con->sock);
1121                con->sock = NULL;
1122        } else if (sock) {
1123                sock_release(sock);
1124        }
1125        /*
1126         * Some errors are fatal and this list might need adjusting. For other
1127         * errors we try again until the max number of retries is reached.
1128         */
1129        if (result != -EHOSTUNREACH &&
1130            result != -ENETUNREACH &&
1131            result != -ENETDOWN && 
1132            result != -EINVAL &&
1133            result != -EPROTONOSUPPORT) {
1134                log_print("connect %d try %d error %d", con->nodeid,
1135                          con->retries, result);
1136                mutex_unlock(&con->sock_mutex);
1137                msleep(1000);
1138                lowcomms_connect_sock(con);
1139                return;
1140        }
1141out:
1142        mutex_unlock(&con->sock_mutex);
1143        return;
1144}
1145
1146static struct socket *tcp_create_listen_sock(struct connection *con,
1147                                             struct sockaddr_storage *saddr)
1148{
1149        struct socket *sock = NULL;
1150        int result = 0;
1151        int one = 1;
1152        int addr_len;
1153
1154        if (dlm_local_addr[0]->ss_family == AF_INET)
1155                addr_len = sizeof(struct sockaddr_in);
1156        else
1157                addr_len = sizeof(struct sockaddr_in6);
1158
1159        /* Create a socket to communicate with */
1160        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1161                                  IPPROTO_TCP, &sock);
1162        if (result < 0) {
1163                log_print("Can't create listening comms socket");
1164                goto create_out;
1165        }
1166
1167        /* Turn off Nagle's algorithm */
1168        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1169                          sizeof(one));
1170
1171        result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1172                                   (char *)&one, sizeof(one));
1173
1174        if (result < 0) {
1175                log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1176        }
1177        con->rx_action = tcp_accept_from_sock;
1178        con->connect_action = tcp_connect_to_sock;
1179
1180        /* Bind to our port */
1181        make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1182        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1183        if (result < 0) {
1184                log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1185                sock_release(sock);
1186                sock = NULL;
1187                con->sock = NULL;
1188                goto create_out;
1189        }
1190        result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1191                                 (char *)&one, sizeof(one));
1192        if (result < 0) {
1193                log_print("Set keepalive failed: %d", result);
1194        }
1195
1196        result = sock->ops->listen(sock, 5);
1197        if (result < 0) {
1198                log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1199                sock_release(sock);
1200                sock = NULL;
1201                goto create_out;
1202        }
1203
1204create_out:
1205        return sock;
1206}
1207
1208/* Get local addresses */
1209static void init_local(void)
1210{
1211        struct sockaddr_storage sas, *addr;
1212        int i;
1213
1214        dlm_local_count = 0;
1215        for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1216                if (dlm_our_addr(&sas, i))
1217                        break;
1218
1219                addr = kmalloc(sizeof(*addr), GFP_NOFS);
1220                if (!addr)
1221                        break;
1222                memcpy(addr, &sas, sizeof(*addr));
1223                dlm_local_addr[dlm_local_count++] = addr;
1224        }
1225}
1226
1227/* Bind to an IP address. SCTP allows multiple address so it can do
1228   multi-homing */
1229static int add_sctp_bind_addr(struct connection *sctp_con,
1230                              struct sockaddr_storage *addr,
1231                              int addr_len, int num)
1232{
1233        int result = 0;
1234
1235        if (num == 1)
1236                result = kernel_bind(sctp_con->sock,
1237                                     (struct sockaddr *) addr,
1238                                     addr_len);
1239        else
1240                result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1241                                           SCTP_SOCKOPT_BINDX_ADD,
1242                                           (char *)addr, addr_len);
1243
1244        if (result < 0)
1245                log_print("Can't bind to port %d addr number %d",
1246                          dlm_config.ci_tcp_port, num);
1247
1248        return result;
1249}
1250
1251/* Initialise SCTP socket and bind to all interfaces */
1252static int sctp_listen_for_all(void)
1253{
1254        struct socket *sock = NULL;
1255        struct sockaddr_storage localaddr;
1256        struct sctp_event_subscribe subscribe;
1257        int result = -EINVAL, num = 1, i, addr_len;
1258        struct connection *con = nodeid2con(0, GFP_NOFS);
1259        int bufsize = NEEDED_RMEM;
1260
1261        if (!con)
1262                return -ENOMEM;
1263
1264        log_print("Using SCTP for communications");
1265
1266        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1267                                  IPPROTO_SCTP, &sock);
1268        if (result < 0) {
1269                log_print("Can't create comms socket, check SCTP is loaded");
1270                goto out;
1271        }
1272
1273        /* Listen for events */
1274        memset(&subscribe, 0, sizeof(subscribe));
1275        subscribe.sctp_data_io_event = 1;
1276        subscribe.sctp_association_event = 1;
1277        subscribe.sctp_send_failure_event = 1;
1278        subscribe.sctp_shutdown_event = 1;
1279        subscribe.sctp_partial_delivery_event = 1;
1280
1281        result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1282                                 (char *)&bufsize, sizeof(bufsize));
1283        if (result)
1284                log_print("Error increasing buffer space on socket %d", result);
1285
1286        result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1287                                   (char *)&subscribe, sizeof(subscribe));
1288        if (result < 0) {
1289                log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1290                          result);
1291                goto create_delsock;
1292        }
1293
1294        /* Init con struct */
1295        sock->sk->sk_user_data = con;
1296        con->sock = sock;
1297        con->sock->sk->sk_data_ready = lowcomms_data_ready;
1298        con->rx_action = receive_from_sock;
1299        con->connect_action = sctp_init_assoc;
1300
1301        /* Bind to all interfaces. */
1302        for (i = 0; i < dlm_local_count; i++) {
1303                memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1304                make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1305
1306                result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1307                if (result)
1308                        goto create_delsock;
1309                ++num;
1310        }
1311
1312        result = sock->ops->listen(sock, 5);
1313        if (result < 0) {
1314                log_print("Can't set socket listening");
1315                goto create_delsock;
1316        }
1317
1318        return 0;
1319
1320create_delsock:
1321        sock_release(sock);
1322        con->sock = NULL;
1323out:
1324        return result;
1325}
1326
1327static int tcp_listen_for_all(void)
1328{
1329        struct socket *sock = NULL;
1330        struct connection *con = nodeid2con(0, GFP_NOFS);
1331        int result = -EINVAL;
1332
1333        if (!con)
1334                return -ENOMEM;
1335
1336        /* We don't support multi-homed hosts */
1337        if (dlm_local_addr[1] != NULL) {
1338                log_print("TCP protocol can't handle multi-homed hosts, "
1339                          "try SCTP");
1340                return -EINVAL;
1341        }
1342
1343        log_print("Using TCP for communications");
1344
1345        sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1346        if (sock) {
1347                add_sock(sock, con);
1348                result = 0;
1349        }
1350        else {
1351                result = -EADDRINUSE;
1352        }
1353
1354        return result;
1355}
1356
1357
1358
1359static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1360                                                     gfp_t allocation)
1361{
1362        struct writequeue_entry *entry;
1363
1364        entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1365        if (!entry)
1366                return NULL;
1367
1368        entry->page = alloc_page(allocation);
1369        if (!entry->page) {
1370                kfree(entry);
1371                return NULL;
1372        }
1373
1374        entry->offset = 0;
1375        entry->len = 0;
1376        entry->end = 0;
1377        entry->users = 0;
1378        entry->con = con;
1379
1380        return entry;
1381}
1382
1383void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1384{
1385        struct connection *con;
1386        struct writequeue_entry *e;
1387        int offset = 0;
1388        int users = 0;
1389
1390        con = nodeid2con(nodeid, allocation);
1391        if (!con)
1392                return NULL;
1393
1394        spin_lock(&con->writequeue_lock);
1395        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1396        if ((&e->list == &con->writequeue) ||
1397            (PAGE_CACHE_SIZE - e->end < len)) {
1398                e = NULL;
1399        } else {
1400                offset = e->end;
1401                e->end += len;
1402                users = e->users++;
1403        }
1404        spin_unlock(&con->writequeue_lock);
1405
1406        if (e) {
1407        got_one:
1408                *ppc = page_address(e->page) + offset;
1409                return e;
1410        }
1411
1412        e = new_writequeue_entry(con, allocation);
1413        if (e) {
1414                spin_lock(&con->writequeue_lock);
1415                offset = e->end;
1416                e->end += len;
1417                users = e->users++;
1418                list_add_tail(&e->list, &con->writequeue);
1419                spin_unlock(&con->writequeue_lock);
1420                goto got_one;
1421        }
1422        return NULL;
1423}
1424
1425void dlm_lowcomms_commit_buffer(void *mh)
1426{
1427        struct writequeue_entry *e = (struct writequeue_entry *)mh;
1428        struct connection *con = e->con;
1429        int users;
1430
1431        spin_lock(&con->writequeue_lock);
1432        users = --e->users;
1433        if (users)
1434                goto out;
1435        e->len = e->end - e->offset;
1436        spin_unlock(&con->writequeue_lock);
1437
1438        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1439                queue_work(send_workqueue, &con->swork);
1440        }
1441        return;
1442
1443out:
1444        spin_unlock(&con->writequeue_lock);
1445        return;
1446}
1447
1448/* Send a message */
1449static void send_to_sock(struct connection *con)
1450{
1451        int ret = 0;
1452        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1453        struct writequeue_entry *e;
1454        int len, offset;
1455        int count = 0;
1456
1457        mutex_lock(&con->sock_mutex);
1458        if (con->sock == NULL)
1459                goto out_connect;
1460
1461        spin_lock(&con->writequeue_lock);
1462        for (;;) {
1463                e = list_entry(con->writequeue.next, struct writequeue_entry,
1464                               list);
1465                if ((struct list_head *) e == &con->writequeue)
1466                        break;
1467
1468                len = e->len;
1469                offset = e->offset;
1470                BUG_ON(len == 0 && e->users == 0);
1471                spin_unlock(&con->writequeue_lock);
1472
1473                ret = 0;
1474                if (len) {
1475                        ret = kernel_sendpage(con->sock, e->page, offset, len,
1476                                              msg_flags);
1477                        if (ret == -EAGAIN || ret == 0) {
1478                                if (ret == -EAGAIN &&
1479                                    test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1480                                    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1481                                        /* Notify TCP that we're limited by the
1482                                         * application window size.
1483                                         */
1484                                        set_bit(SOCK_NOSPACE, &con->sock->flags);
1485                                        con->sock->sk->sk_write_pending++;
1486                                }
1487                                cond_resched();
1488                                goto out;
1489                        } else if (ret < 0)
1490                                goto send_error;
1491                }
1492
1493                /* Don't starve people filling buffers */
1494                if (++count >= MAX_SEND_MSG_COUNT) {
1495                        cond_resched();
1496                        count = 0;
1497                }
1498
1499                spin_lock(&con->writequeue_lock);
1500                e->offset += ret;
1501                e->len -= ret;
1502
1503                if (e->len == 0 && e->users == 0) {
1504                        list_del(&e->list);
1505                        free_entry(e);
1506                }
1507        }
1508        spin_unlock(&con->writequeue_lock);
1509out:
1510        mutex_unlock(&con->sock_mutex);
1511        return;
1512
1513send_error:
1514        mutex_unlock(&con->sock_mutex);
1515        close_connection(con, false);
1516        lowcomms_connect_sock(con);
1517        return;
1518
1519out_connect:
1520        mutex_unlock(&con->sock_mutex);
1521        if (!test_bit(CF_INIT_PENDING, &con->flags))
1522                lowcomms_connect_sock(con);
1523}
1524
1525static void clean_one_writequeue(struct connection *con)
1526{
1527        struct writequeue_entry *e, *safe;
1528
1529        spin_lock(&con->writequeue_lock);
1530        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1531                list_del(&e->list);
1532                free_entry(e);
1533        }
1534        spin_unlock(&con->writequeue_lock);
1535}
1536
1537/* Called from recovery when it knows that a node has
1538   left the cluster */
1539int dlm_lowcomms_close(int nodeid)
1540{
1541        struct connection *con;
1542        struct dlm_node_addr *na;
1543
1544        log_print("closing connection to node %d", nodeid);
1545        con = nodeid2con(nodeid, 0);
1546        if (con) {
1547                clear_bit(CF_CONNECT_PENDING, &con->flags);
1548                clear_bit(CF_WRITE_PENDING, &con->flags);
1549                set_bit(CF_CLOSE, &con->flags);
1550                if (cancel_work_sync(&con->swork))
1551                        log_print("canceled swork for node %d", nodeid);
1552                if (cancel_work_sync(&con->rwork))
1553                        log_print("canceled rwork for node %d", nodeid);
1554                clean_one_writequeue(con);
1555                close_connection(con, true);
1556        }
1557
1558        spin_lock(&dlm_node_addrs_spin);
1559        na = find_node_addr(nodeid);
1560        if (na) {
1561                list_del(&na->list);
1562                while (na->addr_count--)
1563                        kfree(na->addr[na->addr_count]);
1564                kfree(na);
1565        }
1566        spin_unlock(&dlm_node_addrs_spin);
1567
1568        return 0;
1569}
1570
1571/* Receive workqueue function */
1572static void process_recv_sockets(struct work_struct *work)
1573{
1574        struct connection *con = container_of(work, struct connection, rwork);
1575        int err;
1576
1577        clear_bit(CF_READ_PENDING, &con->flags);
1578        do {
1579                err = con->rx_action(con);
1580        } while (!err);
1581}
1582
1583/* Send workqueue function */
1584static void process_send_sockets(struct work_struct *work)
1585{
1586        struct connection *con = container_of(work, struct connection, swork);
1587
1588        if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1589                con->connect_action(con);
1590                set_bit(CF_WRITE_PENDING, &con->flags);
1591        }
1592        if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1593                send_to_sock(con);
1594}
1595
1596
1597/* Discard all entries on the write queues */
1598static void clean_writequeues(void)
1599{
1600        foreach_conn(clean_one_writequeue);
1601}
1602
1603static void work_stop(void)
1604{
1605        destroy_workqueue(recv_workqueue);
1606        destroy_workqueue(send_workqueue);
1607}
1608
1609static int work_start(void)
1610{
1611        recv_workqueue = alloc_workqueue("dlm_recv",
1612                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1613        if (!recv_workqueue) {
1614                log_print("can't start dlm_recv");
1615                return -ENOMEM;
1616        }
1617
1618        send_workqueue = alloc_workqueue("dlm_send",
1619                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1620        if (!send_workqueue) {
1621                log_print("can't start dlm_send");
1622                destroy_workqueue(recv_workqueue);
1623                return -ENOMEM;
1624        }
1625
1626        return 0;
1627}
1628
1629static void stop_conn(struct connection *con)
1630{
1631        con->flags |= 0x0F;
1632        if (con->sock && con->sock->sk)
1633                con->sock->sk->sk_user_data = NULL;
1634}
1635
1636static void free_conn(struct connection *con)
1637{
1638        close_connection(con, true);
1639        if (con->othercon)
1640                kmem_cache_free(con_cache, con->othercon);
1641        hlist_del(&con->list);
1642        kmem_cache_free(con_cache, con);
1643}
1644
1645void dlm_lowcomms_stop(void)
1646{
1647        /* Set all the flags to prevent any
1648           socket activity.
1649        */
1650        mutex_lock(&connections_lock);
1651        dlm_allow_conn = 0;
1652        foreach_conn(stop_conn);
1653        mutex_unlock(&connections_lock);
1654
1655        work_stop();
1656
1657        mutex_lock(&connections_lock);
1658        clean_writequeues();
1659
1660        foreach_conn(free_conn);
1661
1662        mutex_unlock(&connections_lock);
1663        kmem_cache_destroy(con_cache);
1664}
1665
1666int dlm_lowcomms_start(void)
1667{
1668        int error = -EINVAL;
1669        struct connection *con;
1670        int i;
1671
1672        for (i = 0; i < CONN_HASH_SIZE; i++)
1673                INIT_HLIST_HEAD(&connection_hash[i]);
1674
1675        init_local();
1676        if (!dlm_local_count) {
1677                error = -ENOTCONN;
1678                log_print("no local IP address has been set");
1679                goto fail;
1680        }
1681
1682        error = -ENOMEM;
1683        con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1684                                      __alignof__(struct connection), 0,
1685                                      NULL);
1686        if (!con_cache)
1687                goto fail;
1688
1689        error = work_start();
1690        if (error)
1691                goto fail_destroy;
1692
1693        dlm_allow_conn = 1;
1694
1695        /* Start listening */
1696        if (dlm_config.ci_protocol == 0)
1697                error = tcp_listen_for_all();
1698        else
1699                error = sctp_listen_for_all();
1700        if (error)
1701                goto fail_unlisten;
1702
1703        return 0;
1704
1705fail_unlisten:
1706        dlm_allow_conn = 0;
1707        con = nodeid2con(0,0);
1708        if (con) {
1709                close_connection(con, false);
1710                kmem_cache_free(con_cache, con);
1711        }
1712fail_destroy:
1713        kmem_cache_destroy(con_cache);
1714fail:
1715        return error;
1716}
1717
1718void dlm_lowcomms_exit(void)
1719{
1720        struct dlm_node_addr *na, *safe;
1721
1722        spin_lock(&dlm_node_addrs_spin);
1723        list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1724                list_del(&na->list);
1725                while (na->addr_count--)
1726                        kfree(na->addr[na->addr_count]);
1727                kfree(na);
1728        }
1729        spin_unlock(&dlm_node_addrs_spin);
1730}
1731
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.