linux/fs/dlm/lowcomms.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14/*
  15 * lowcomms.c
  16 *
  17 * This is the "low-level" comms layer.
  18 *
  19 * It is responsible for sending/receiving messages
  20 * from other nodes in the cluster.
  21 *
  22 * Cluster nodes are referred to by their nodeids. nodeids are
  23 * simply 32 bit numbers to the locking module - if they need to
  24 * be expanded for the cluster infrastructure then that is its
  25 * responsibility. It is this layer's
  26 * responsibility to resolve these into IP address or
  27 * whatever it needs for inter-node communication.
  28 *
  29 * The comms level is two kernel threads that deal mainly with
  30 * the receiving of messages from other nodes and passing them
  31 * up to the mid-level comms layer (which understands the
  32 * message format) for execution by the locking core, and
  33 * a send thread which does all the setting up of connections
  34 * to remote nodes and the sending of data. Threads are not allowed
  35 * to send their own data because it may cause them to wait in times
  36 * of high load. Also, this way, the sending thread can collect together
  37 * messages bound for one node and send them in one block.
  38 *
  39 * lowcomms will choose to use either TCP or SCTP as its transport layer
  40 * depending on the configuration variable 'protocol'. This should be set
  41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
  43 * for the DLM to function.
  44 *
  45 */
  46
  47#include <asm/ioctls.h>
  48#include <net/sock.h>
  49#include <net/tcp.h>
  50#include <linux/pagemap.h>
  51#include <linux/file.h>
  52#include <linux/mutex.h>
  53#include <linux/sctp.h>
  54#include <linux/slab.h>
  55#include <net/sctp/user.h>
  56#include <net/ipv6.h>
  57
  58#include "dlm_internal.h"
  59#include "lowcomms.h"
  60#include "midcomms.h"
  61#include "config.h"
  62
  63#define NEEDED_RMEM (4*1024*1024)
  64#define CONN_HASH_SIZE 32
  65
  66struct cbuf {
  67        unsigned int base;
  68        unsigned int len;
  69        unsigned int mask;
  70};
  71
  72static void cbuf_add(struct cbuf *cb, int n)
  73{
  74        cb->len += n;
  75}
  76
  77static int cbuf_data(struct cbuf *cb)
  78{
  79        return ((cb->base + cb->len) & cb->mask);
  80}
  81
  82static void cbuf_init(struct cbuf *cb, int size)
  83{
  84        cb->base = cb->len = 0;
  85        cb->mask = size-1;
  86}
  87
  88static void cbuf_eat(struct cbuf *cb, int n)
  89{
  90        cb->len  -= n;
  91        cb->base += n;
  92        cb->base &= cb->mask;
  93}
  94
  95static bool cbuf_empty(struct cbuf *cb)
  96{
  97        return cb->len == 0;
  98}
  99
 100struct connection {
 101        struct socket *sock;    /* NULL if not connected */
 102        uint32_t nodeid;        /* So we know who we are in the list */
 103        struct mutex sock_mutex;
 104        unsigned long flags;
 105#define CF_READ_PENDING 1
 106#define CF_WRITE_PENDING 2
 107#define CF_CONNECT_PENDING 3
 108#define CF_INIT_PENDING 4
 109#define CF_IS_OTHERCON 5
 110#define CF_CLOSE 6
 111        struct list_head writequeue;  /* List of outgoing writequeue_entries */
 112        spinlock_t writequeue_lock;
 113        int (*rx_action) (struct connection *); /* What to do when active */
 114        void (*connect_action) (struct connection *);   /* What to do to connect */
 115        struct page *rx_page;
 116        struct cbuf cb;
 117        int retries;
 118#define MAX_CONNECT_RETRIES 3
 119        int sctp_assoc;
 120        struct hlist_node list;
 121        struct connection *othercon;
 122        struct work_struct rwork; /* Receive workqueue */
 123        struct work_struct swork; /* Send workqueue */
 124};
 125#define sock2con(x) ((struct connection *)(x)->sk_user_data)
 126
 127/* An entry waiting to be sent */
 128struct writequeue_entry {
 129        struct list_head list;
 130        struct page *page;
 131        int offset;
 132        int len;
 133        int end;
 134        int users;
 135        struct connection *con;
 136};
 137
 138static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 139static int dlm_local_count;
 140
 141/* Work queues */
 142static struct workqueue_struct *recv_workqueue;
 143static struct workqueue_struct *send_workqueue;
 144
 145static struct hlist_head connection_hash[CONN_HASH_SIZE];
 146static DEFINE_MUTEX(connections_lock);
 147static struct kmem_cache *con_cache;
 148
 149static void process_recv_sockets(struct work_struct *work);
 150static void process_send_sockets(struct work_struct *work);
 151
 152
 153/* This is deliberately very simple because most clusters have simple
 154   sequential nodeids, so we should be able to go straight to a connection
 155   struct in the array */
 156static inline int nodeid_hash(int nodeid)
 157{
 158        return nodeid & (CONN_HASH_SIZE-1);
 159}
 160
 161static struct connection *__find_con(int nodeid)
 162{
 163        int r;
 164        struct hlist_node *h;
 165        struct connection *con;
 166
 167        r = nodeid_hash(nodeid);
 168
 169        hlist_for_each_entry(con, h, &connection_hash[r], list) {
 170                if (con->nodeid == nodeid)
 171                        return con;
 172        }
 173        return NULL;
 174}
 175
 176/*
 177 * If 'allocation' is zero then we don't attempt to create a new
 178 * connection structure for this node.
 179 */
 180static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
 181{
 182        struct connection *con = NULL;
 183        int r;
 184
 185        con = __find_con(nodeid);
 186        if (con || !alloc)
 187                return con;
 188
 189        con = kmem_cache_zalloc(con_cache, alloc);
 190        if (!con)
 191                return NULL;
 192
 193        r = nodeid_hash(nodeid);
 194        hlist_add_head(&con->list, &connection_hash[r]);
 195
 196        con->nodeid = nodeid;
 197        mutex_init(&con->sock_mutex);
 198        INIT_LIST_HEAD(&con->writequeue);
 199        spin_lock_init(&con->writequeue_lock);
 200        INIT_WORK(&con->swork, process_send_sockets);
 201        INIT_WORK(&con->rwork, process_recv_sockets);
 202
 203        /* Setup action pointers for child sockets */
 204        if (con->nodeid) {
 205                struct connection *zerocon = __find_con(0);
 206
 207                con->connect_action = zerocon->connect_action;
 208                if (!con->rx_action)
 209                        con->rx_action = zerocon->rx_action;
 210        }
 211
 212        return con;
 213}
 214
 215/* Loop round all connections */
 216static void foreach_conn(void (*conn_func)(struct connection *c))
 217{
 218        int i;
 219        struct hlist_node *h, *n;
 220        struct connection *con;
 221
 222        for (i = 0; i < CONN_HASH_SIZE; i++) {
 223                hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){
 224                        conn_func(con);
 225                }
 226        }
 227}
 228
 229static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 230{
 231        struct connection *con;
 232
 233        mutex_lock(&connections_lock);
 234        con = __nodeid2con(nodeid, allocation);
 235        mutex_unlock(&connections_lock);
 236
 237        return con;
 238}
 239
 240/* This is a bit drastic, but only called when things go wrong */
 241static struct connection *assoc2con(int assoc_id)
 242{
 243        int i;
 244        struct hlist_node *h;
 245        struct connection *con;
 246
 247        mutex_lock(&connections_lock);
 248
 249        for (i = 0 ; i < CONN_HASH_SIZE; i++) {
 250                hlist_for_each_entry(con, h, &connection_hash[i], list) {
 251                        if (con && con->sctp_assoc == assoc_id) {
 252                                mutex_unlock(&connections_lock);
 253                                return con;
 254                        }
 255                }
 256        }
 257        mutex_unlock(&connections_lock);
 258        return NULL;
 259}
 260
 261static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
 262{
 263        struct sockaddr_storage addr;
 264        int error;
 265
 266        if (!dlm_local_count)
 267                return -1;
 268
 269        error = dlm_nodeid_to_addr(nodeid, &addr);
 270        if (error)
 271                return error;
 272
 273        if (dlm_local_addr[0]->ss_family == AF_INET) {
 274                struct sockaddr_in *in4  = (struct sockaddr_in *) &addr;
 275                struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
 276                ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 277        } else {
 278                struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &addr;
 279                struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
 280                ipv6_addr_copy(&ret6->sin6_addr, &in6->sin6_addr);
 281        }
 282
 283        return 0;
 284}
 285
 286/* Data available on socket or listen socket received a connect */
 287static void lowcomms_data_ready(struct sock *sk, int count_unused)
 288{
 289        struct connection *con = sock2con(sk);
 290        if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
 291                queue_work(recv_workqueue, &con->rwork);
 292}
 293
 294static void lowcomms_write_space(struct sock *sk)
 295{
 296        struct connection *con = sock2con(sk);
 297
 298        if (con && !test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 299                queue_work(send_workqueue, &con->swork);
 300}
 301
 302static inline void lowcomms_connect_sock(struct connection *con)
 303{
 304        if (test_bit(CF_CLOSE, &con->flags))
 305                return;
 306        if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
 307                queue_work(send_workqueue, &con->swork);
 308}
 309
 310static void lowcomms_state_change(struct sock *sk)
 311{
 312        if (sk->sk_state == TCP_ESTABLISHED)
 313                lowcomms_write_space(sk);
 314}
 315
 316int dlm_lowcomms_connect_node(int nodeid)
 317{
 318        struct connection *con;
 319
 320        /* with sctp there's no connecting without sending */
 321        if (dlm_config.ci_protocol != 0)
 322                return 0;
 323
 324        if (nodeid == dlm_our_nodeid())
 325                return 0;
 326
 327        con = nodeid2con(nodeid, GFP_NOFS);
 328        if (!con)
 329                return -ENOMEM;
 330        lowcomms_connect_sock(con);
 331        return 0;
 332}
 333
 334/* Make a socket active */
 335static int add_sock(struct socket *sock, struct connection *con)
 336{
 337        con->sock = sock;
 338
 339        /* Install a data_ready callback */
 340        con->sock->sk->sk_data_ready = lowcomms_data_ready;
 341        con->sock->sk->sk_write_space = lowcomms_write_space;
 342        con->sock->sk->sk_state_change = lowcomms_state_change;
 343        con->sock->sk->sk_user_data = con;
 344        con->sock->sk->sk_allocation = GFP_NOFS;
 345        return 0;
 346}
 347
 348/* Add the port number to an IPv6 or 4 sockaddr and return the address
 349   length */
 350static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 351                          int *addr_len)
 352{
 353        saddr->ss_family =  dlm_local_addr[0]->ss_family;
 354        if (saddr->ss_family == AF_INET) {
 355                struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 356                in4_addr->sin_port = cpu_to_be16(port);
 357                *addr_len = sizeof(struct sockaddr_in);
 358                memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
 359        } else {
 360                struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
 361                in6_addr->sin6_port = cpu_to_be16(port);
 362                *addr_len = sizeof(struct sockaddr_in6);
 363        }
 364        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 365}
 366
 367/* Close a remote connection and tidy up */
 368static void close_connection(struct connection *con, bool and_other)
 369{
 370        mutex_lock(&con->sock_mutex);
 371
 372        if (con->sock) {
 373                sock_release(con->sock);
 374                con->sock = NULL;
 375        }
 376        if (con->othercon && and_other) {
 377                /* Will only re-enter once. */
 378                close_connection(con->othercon, false);
 379        }
 380        if (con->rx_page) {
 381                __free_page(con->rx_page);
 382                con->rx_page = NULL;
 383        }
 384
 385        con->retries = 0;
 386        mutex_unlock(&con->sock_mutex);
 387}
 388
 389/* We only send shutdown messages to nodes that are not part of the cluster */
 390static void sctp_send_shutdown(sctp_assoc_t associd)
 391{
 392        static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 393        struct msghdr outmessage;
 394        struct cmsghdr *cmsg;
 395        struct sctp_sndrcvinfo *sinfo;
 396        int ret;
 397        struct connection *con;
 398
 399        con = nodeid2con(0,0);
 400        BUG_ON(con == NULL);
 401
 402        outmessage.msg_name = NULL;
 403        outmessage.msg_namelen = 0;
 404        outmessage.msg_control = outcmsg;
 405        outmessage.msg_controllen = sizeof(outcmsg);
 406        outmessage.msg_flags = MSG_EOR;
 407
 408        cmsg = CMSG_FIRSTHDR(&outmessage);
 409        cmsg->cmsg_level = IPPROTO_SCTP;
 410        cmsg->cmsg_type = SCTP_SNDRCV;
 411        cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
 412        outmessage.msg_controllen = cmsg->cmsg_len;
 413        sinfo = CMSG_DATA(cmsg);
 414        memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
 415
 416        sinfo->sinfo_flags |= MSG_EOF;
 417        sinfo->sinfo_assoc_id = associd;
 418
 419        ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
 420
 421        if (ret != 0)
 422                log_print("send EOF to node failed: %d", ret);
 423}
 424
 425static void sctp_init_failed_foreach(struct connection *con)
 426{
 427        con->sctp_assoc = 0;
 428        if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
 429                if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 430                        queue_work(send_workqueue, &con->swork);
 431        }
 432}
 433
 434/* INIT failed but we don't know which node...
 435   restart INIT on all pending nodes */
 436static void sctp_init_failed(void)
 437{
 438        mutex_lock(&connections_lock);
 439
 440        foreach_conn(sctp_init_failed_foreach);
 441
 442        mutex_unlock(&connections_lock);
 443}
 444
 445/* Something happened to an association */
 446static void process_sctp_notification(struct connection *con,
 447                                      struct msghdr *msg, char *buf)
 448{
 449        union sctp_notification *sn = (union sctp_notification *)buf;
 450
 451        if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
 452                switch (sn->sn_assoc_change.sac_state) {
 453
 454                case SCTP_COMM_UP:
 455                case SCTP_RESTART:
 456                {
 457                        /* Check that the new node is in the lockspace */
 458                        struct sctp_prim prim;
 459                        int nodeid;
 460                        int prim_len, ret;
 461                        int addr_len;
 462                        struct connection *new_con;
 463                        sctp_peeloff_arg_t parg;
 464                        int parglen = sizeof(parg);
 465                        int err;
 466
 467                        /*
 468                         * We get this before any data for an association.
 469                         * We verify that the node is in the cluster and
 470                         * then peel off a socket for it.
 471                         */
 472                        if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
 473                                log_print("COMM_UP for invalid assoc ID %d",
 474                                         (int)sn->sn_assoc_change.sac_assoc_id);
 475                                sctp_init_failed();
 476                                return;
 477                        }
 478                        memset(&prim, 0, sizeof(struct sctp_prim));
 479                        prim_len = sizeof(struct sctp_prim);
 480                        prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
 481
 482                        ret = kernel_getsockopt(con->sock,
 483                                                IPPROTO_SCTP,
 484                                                SCTP_PRIMARY_ADDR,
 485                                                (char*)&prim,
 486                                                &prim_len);
 487                        if (ret < 0) {
 488                                log_print("getsockopt/sctp_primary_addr on "
 489                                          "new assoc %d failed : %d",
 490                                          (int)sn->sn_assoc_change.sac_assoc_id,
 491                                          ret);
 492
 493                                /* Retry INIT later */
 494                                new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
 495                                if (new_con)
 496                                        clear_bit(CF_CONNECT_PENDING, &con->flags);
 497                                return;
 498                        }
 499                        make_sockaddr(&prim.ssp_addr, 0, &addr_len);
 500                        if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
 501                                int i;
 502                                unsigned char *b=(unsigned char *)&prim.ssp_addr;
 503                                log_print("reject connect from unknown addr");
 504                                for (i=0; i<sizeof(struct sockaddr_storage);i++)
 505                                        printk("%02x ", b[i]);
 506                                printk("\n");
 507                                sctp_send_shutdown(prim.ssp_assoc_id);
 508                                return;
 509                        }
 510
 511                        new_con = nodeid2con(nodeid, GFP_NOFS);
 512                        if (!new_con)
 513                                return;
 514
 515                        /* Peel off a new sock */
 516                        parg.associd = sn->sn_assoc_change.sac_assoc_id;
 517                        ret = kernel_getsockopt(con->sock, IPPROTO_SCTP,
 518                                                SCTP_SOCKOPT_PEELOFF,
 519                                                (void *)&parg, &parglen);
 520                        if (ret < 0) {
 521                                log_print("Can't peel off a socket for "
 522                                          "connection %d to node %d: err=%d",
 523                                          parg.associd, nodeid, ret);
 524                                return;
 525                        }
 526                        new_con->sock = sockfd_lookup(parg.sd, &err);
 527                        if (!new_con->sock) {
 528                                log_print("sockfd_lookup error %d", err);
 529                                return;
 530                        }
 531                        add_sock(new_con->sock, new_con);
 532                        sockfd_put(new_con->sock);
 533
 534                        log_print("connecting to %d sctp association %d",
 535                                 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
 536
 537                        /* Send any pending writes */
 538                        clear_bit(CF_CONNECT_PENDING, &new_con->flags);
 539                        clear_bit(CF_INIT_PENDING, &con->flags);
 540                        if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
 541                                queue_work(send_workqueue, &new_con->swork);
 542                        }
 543                        if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
 544                                queue_work(recv_workqueue, &new_con->rwork);
 545                }
 546                break;
 547
 548                case SCTP_COMM_LOST:
 549                case SCTP_SHUTDOWN_COMP:
 550                {
 551                        con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
 552                        if (con) {
 553                                con->sctp_assoc = 0;
 554                        }
 555                }
 556                break;
 557
 558                /* We don't know which INIT failed, so clear the PENDING flags
 559                 * on them all.  if assoc_id is zero then it will then try
 560                 * again */
 561
 562                case SCTP_CANT_STR_ASSOC:
 563                {
 564                        log_print("Can't start SCTP association - retrying");
 565                        sctp_init_failed();
 566                }
 567                break;
 568
 569                default:
 570                        log_print("unexpected SCTP assoc change id=%d state=%d",
 571                                  (int)sn->sn_assoc_change.sac_assoc_id,
 572                                  sn->sn_assoc_change.sac_state);
 573                }
 574        }
 575}
 576
 577/* Data received from remote end */
 578static int receive_from_sock(struct connection *con)
 579{
 580        int ret = 0;
 581        struct msghdr msg = {};
 582        struct kvec iov[2];
 583        unsigned len;
 584        int r;
 585        int call_again_soon = 0;
 586        int nvec;
 587        char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 588
 589        mutex_lock(&con->sock_mutex);
 590
 591        if (con->sock == NULL) {
 592                ret = -EAGAIN;
 593                goto out_close;
 594        }
 595
 596        if (con->rx_page == NULL) {
 597                /*
 598                 * This doesn't need to be atomic, but I think it should
 599                 * improve performance if it is.
 600                 */
 601                con->rx_page = alloc_page(GFP_ATOMIC);
 602                if (con->rx_page == NULL)
 603                        goto out_resched;
 604                cbuf_init(&con->cb, PAGE_CACHE_SIZE);
 605        }
 606
 607        /* Only SCTP needs these really */
 608        memset(&incmsg, 0, sizeof(incmsg));
 609        msg.msg_control = incmsg;
 610        msg.msg_controllen = sizeof(incmsg);
 611
 612        /*
 613         * iov[0] is the bit of the circular buffer between the current end
 614         * point (cb.base + cb.len) and the end of the buffer.
 615         */
 616        iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
 617        iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
 618        iov[1].iov_len = 0;
 619        nvec = 1;
 620
 621        /*
 622         * iov[1] is the bit of the circular buffer between the start of the
 623         * buffer and the start of the currently used section (cb.base)
 624         */
 625        if (cbuf_data(&con->cb) >= con->cb.base) {
 626                iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
 627                iov[1].iov_len = con->cb.base;
 628                iov[1].iov_base = page_address(con->rx_page);
 629                nvec = 2;
 630        }
 631        len = iov[0].iov_len + iov[1].iov_len;
 632
 633        r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
 634                               MSG_DONTWAIT | MSG_NOSIGNAL);
 635        if (ret <= 0)
 636                goto out_close;
 637
 638        /* Process SCTP notifications */
 639        if (msg.msg_flags & MSG_NOTIFICATION) {
 640                msg.msg_control = incmsg;
 641                msg.msg_controllen = sizeof(incmsg);
 642
 643                process_sctp_notification(con, &msg,
 644                                page_address(con->rx_page) + con->cb.base);
 645                mutex_unlock(&con->sock_mutex);
 646                return 0;
 647        }
 648        BUG_ON(con->nodeid == 0);
 649
 650        if (ret == len)
 651                call_again_soon = 1;
 652        cbuf_add(&con->cb, ret);
 653        ret = dlm_process_incoming_buffer(con->nodeid,
 654                                          page_address(con->rx_page),
 655                                          con->cb.base, con->cb.len,
 656                                          PAGE_CACHE_SIZE);
 657        if (ret == -EBADMSG) {
 658                log_print("lowcomms: addr=%p, base=%u, len=%u, "
 659                          "iov_len=%u, iov_base[0]=%p, read=%d",
 660                          page_address(con->rx_page), con->cb.base, con->cb.len,
 661                          len, iov[0].iov_base, r);
 662        }
 663        if (ret < 0)
 664                goto out_close;
 665        cbuf_eat(&con->cb, ret);
 666
 667        if (cbuf_empty(&con->cb) && !call_again_soon) {
 668                __free_page(con->rx_page);
 669                con->rx_page = NULL;
 670        }
 671
 672        if (call_again_soon)
 673                goto out_resched;
 674        mutex_unlock(&con->sock_mutex);
 675        return 0;
 676
 677out_resched:
 678        if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
 679                queue_work(recv_workqueue, &con->rwork);
 680        mutex_unlock(&con->sock_mutex);
 681        return -EAGAIN;
 682
 683out_close:
 684        mutex_unlock(&con->sock_mutex);
 685        if (ret != -EAGAIN) {
 686                close_connection(con, false);
 687                /* Reconnect when there is something to send */
 688        }
 689        /* Don't return success if we really got EOF */
 690        if (ret == 0)
 691                ret = -EAGAIN;
 692
 693        return ret;
 694}
 695
 696/* Listening socket is busy, accept a connection */
 697static int tcp_accept_from_sock(struct connection *con)
 698{
 699        int result;
 700        struct sockaddr_storage peeraddr;
 701        struct socket *newsock;
 702        int len;
 703        int nodeid;
 704        struct connection *newcon;
 705        struct connection *addcon;
 706
 707        memset(&peeraddr, 0, sizeof(peeraddr));
 708        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
 709                                  IPPROTO_TCP, &newsock);
 710        if (result < 0)
 711                return -ENOMEM;
 712
 713        mutex_lock_nested(&con->sock_mutex, 0);
 714
 715        result = -ENOTCONN;
 716        if (con->sock == NULL)
 717                goto accept_err;
 718
 719        newsock->type = con->sock->type;
 720        newsock->ops = con->sock->ops;
 721
 722        result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
 723        if (result < 0)
 724                goto accept_err;
 725
 726        /* Get the connected socket's peer */
 727        memset(&peeraddr, 0, sizeof(peeraddr));
 728        if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
 729                                  &len, 2)) {
 730                result = -ECONNABORTED;
 731                goto accept_err;
 732        }
 733
 734        /* Get the new node's NODEID */
 735        make_sockaddr(&peeraddr, 0, &len);
 736        if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
 737                log_print("connect from non cluster node");
 738                sock_release(newsock);
 739                mutex_unlock(&con->sock_mutex);
 740                return -1;
 741        }
 742
 743        log_print("got connection from %d", nodeid);
 744
 745        /*  Check to see if we already have a connection to this node. This
 746         *  could happen if the two nodes initiate a connection at roughly
 747         *  the same time and the connections cross on the wire.
 748         *  In this case we store the incoming one in "othercon"
 749         */
 750        newcon = nodeid2con(nodeid, GFP_NOFS);
 751        if (!newcon) {
 752                result = -ENOMEM;
 753                goto accept_err;
 754        }
 755        mutex_lock_nested(&newcon->sock_mutex, 1);
 756        if (newcon->sock) {
 757                struct connection *othercon = newcon->othercon;
 758
 759                if (!othercon) {
 760                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 761                        if (!othercon) {
 762                                log_print("failed to allocate incoming socket");
 763                                mutex_unlock(&newcon->sock_mutex);
 764                                result = -ENOMEM;
 765                                goto accept_err;
 766                        }
 767                        othercon->nodeid = nodeid;
 768                        othercon->rx_action = receive_from_sock;
 769                        mutex_init(&othercon->sock_mutex);
 770                        INIT_WORK(&othercon->swork, process_send_sockets);
 771                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 772                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 773                }
 774                if (!othercon->sock) {
 775                        newcon->othercon = othercon;
 776                        othercon->sock = newsock;
 777                        newsock->sk->sk_user_data = othercon;
 778                        add_sock(newsock, othercon);
 779                        addcon = othercon;
 780                }
 781                else {
 782                        printk("Extra connection from node %d attempted\n", nodeid);
 783                        result = -EAGAIN;
 784                        mutex_unlock(&newcon->sock_mutex);
 785                        goto accept_err;
 786                }
 787        }
 788        else {
 789                newsock->sk->sk_user_data = newcon;
 790                newcon->rx_action = receive_from_sock;
 791                add_sock(newsock, newcon);
 792                addcon = newcon;
 793        }
 794
 795        mutex_unlock(&newcon->sock_mutex);
 796
 797        /*
 798         * Add it to the active queue in case we got data
 799         * beween processing the accept adding the socket
 800         * to the read_sockets list
 801         */
 802        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
 803                queue_work(recv_workqueue, &addcon->rwork);
 804        mutex_unlock(&con->sock_mutex);
 805
 806        return 0;
 807
 808accept_err:
 809        mutex_unlock(&con->sock_mutex);
 810        sock_release(newsock);
 811
 812        if (result != -EAGAIN)
 813                log_print("error accepting connection from node: %d", result);
 814        return result;
 815}
 816
 817static void free_entry(struct writequeue_entry *e)
 818{
 819        __free_page(e->page);
 820        kfree(e);
 821}
 822
 823/* Initiate an SCTP association.
 824   This is a special case of send_to_sock() in that we don't yet have a
 825   peeled-off socket for this association, so we use the listening socket
 826   and add the primary IP address of the remote node.
 827 */
 828static void sctp_init_assoc(struct connection *con)
 829{
 830        struct sockaddr_storage rem_addr;
 831        char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 832        struct msghdr outmessage;
 833        struct cmsghdr *cmsg;
 834        struct sctp_sndrcvinfo *sinfo;
 835        struct connection *base_con;
 836        struct writequeue_entry *e;
 837        int len, offset;
 838        int ret;
 839        int addrlen;
 840        struct kvec iov[1];
 841
 842        if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
 843                return;
 844
 845        if (con->retries++ > MAX_CONNECT_RETRIES)
 846                return;
 847
 848        if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) {
 849                log_print("no address for nodeid %d", con->nodeid);
 850                return;
 851        }
 852        base_con = nodeid2con(0, 0);
 853        BUG_ON(base_con == NULL);
 854
 855        make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
 856
 857        outmessage.msg_name = &rem_addr;
 858        outmessage.msg_namelen = addrlen;
 859        outmessage.msg_control = outcmsg;
 860        outmessage.msg_controllen = sizeof(outcmsg);
 861        outmessage.msg_flags = MSG_EOR;
 862
 863        spin_lock(&con->writequeue_lock);
 864
 865        if (list_empty(&con->writequeue)) {
 866                spin_unlock(&con->writequeue_lock);
 867                log_print("writequeue empty for nodeid %d", con->nodeid);
 868                return;
 869        }
 870
 871        e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
 872        len = e->len;
 873        offset = e->offset;
 874        spin_unlock(&con->writequeue_lock);
 875
 876        /* Send the first block off the write queue */
 877        iov[0].iov_base = page_address(e->page)+offset;
 878        iov[0].iov_len = len;
 879
 880        cmsg = CMSG_FIRSTHDR(&outmessage);
 881        cmsg->cmsg_level = IPPROTO_SCTP;
 882        cmsg->cmsg_type = SCTP_SNDRCV;
 883        cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
 884        sinfo = CMSG_DATA(cmsg);
 885        memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
 886        sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
 887        outmessage.msg_controllen = cmsg->cmsg_len;
 888
 889        ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
 890        if (ret < 0) {
 891                log_print("Send first packet to node %d failed: %d",
 892                          con->nodeid, ret);
 893
 894                /* Try again later */
 895                clear_bit(CF_CONNECT_PENDING, &con->flags);
 896                clear_bit(CF_INIT_PENDING, &con->flags);
 897        }
 898        else {
 899                spin_lock(&con->writequeue_lock);
 900                e->offset += ret;
 901                e->len -= ret;
 902
 903                if (e->len == 0 && e->users == 0) {
 904                        list_del(&e->list);
 905                        free_entry(e);
 906                }
 907                spin_unlock(&con->writequeue_lock);
 908        }
 909}
 910
 911/* Connect a new socket to its peer */
 912static void tcp_connect_to_sock(struct connection *con)
 913{
 914        int result = -EHOSTUNREACH;
 915        struct sockaddr_storage saddr, src_addr;
 916        int addr_len;
 917        struct socket *sock = NULL;
 918
 919        if (con->nodeid == 0) {
 920                log_print("attempt to connect sock 0 foiled");
 921                return;
 922        }
 923
 924        mutex_lock(&con->sock_mutex);
 925        if (con->retries++ > MAX_CONNECT_RETRIES)
 926                goto out;
 927
 928        /* Some odd races can cause double-connects, ignore them */
 929        if (con->sock) {
 930                result = 0;
 931                goto out;
 932        }
 933
 934        /* Create a socket to communicate with */
 935        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
 936                                  IPPROTO_TCP, &sock);
 937        if (result < 0)
 938                goto out_err;
 939
 940        memset(&saddr, 0, sizeof(saddr));
 941        if (dlm_nodeid_to_addr(con->nodeid, &saddr))
 942                goto out_err;
 943
 944        sock->sk->sk_user_data = con;
 945        con->rx_action = receive_from_sock;
 946        con->connect_action = tcp_connect_to_sock;
 947        add_sock(sock, con);
 948
 949        /* Bind to our cluster-known address connecting to avoid
 950           routing problems */
 951        memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
 952        make_sockaddr(&src_addr, 0, &addr_len);
 953        result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
 954                                 addr_len);
 955        if (result < 0) {
 956                log_print("could not bind for connect: %d", result);
 957                /* This *may* not indicate a critical error */
 958        }
 959
 960        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
 961
 962        log_print("connecting to %d", con->nodeid);
 963        result =
 964                sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
 965                                   O_NONBLOCK);
 966        if (result == -EINPROGRESS)
 967                result = 0;
 968        if (result == 0)
 969                goto out;
 970
 971out_err:
 972        if (con->sock) {
 973                sock_release(con->sock);
 974                con->sock = NULL;
 975        } else if (sock) {
 976                sock_release(sock);
 977        }
 978        /*
 979         * Some errors are fatal and this list might need adjusting. For other
 980         * errors we try again until the max number of retries is reached.
 981         */
 982        if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
 983            result != -ENETDOWN && result != -EINVAL
 984            && result != -EPROTONOSUPPORT) {
 985                lowcomms_connect_sock(con);
 986                result = 0;
 987        }
 988out:
 989        mutex_unlock(&con->sock_mutex);
 990        return;
 991}
 992
 993static struct socket *tcp_create_listen_sock(struct connection *con,
 994                                             struct sockaddr_storage *saddr)
 995{
 996        struct socket *sock = NULL;
 997        int result = 0;
 998        int one = 1;
 999        int addr_len;
1000
1001        if (dlm_local_addr[0]->ss_family == AF_INET)
1002                addr_len = sizeof(struct sockaddr_in);
1003        else
1004                addr_len = sizeof(struct sockaddr_in6);
1005
1006        /* Create a socket to communicate with */
1007        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1008                                  IPPROTO_TCP, &sock);
1009        if (result < 0) {
1010                log_print("Can't create listening comms socket");
1011                goto create_out;
1012        }
1013
1014        result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1015                                   (char *)&one, sizeof(one));
1016
1017        if (result < 0) {
1018                log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1019        }
1020        sock->sk->sk_user_data = con;
1021        con->rx_action = tcp_accept_from_sock;
1022        con->connect_action = tcp_connect_to_sock;
1023        con->sock = sock;
1024
1025        /* Bind to our port */
1026        make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1027        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1028        if (result < 0) {
1029                log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1030                sock_release(sock);
1031                sock = NULL;
1032                con->sock = NULL;
1033                goto create_out;
1034        }
1035        result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1036                                 (char *)&one, sizeof(one));
1037        if (result < 0) {
1038                log_print("Set keepalive failed: %d", result);
1039        }
1040
1041        result = sock->ops->listen(sock, 5);
1042        if (result < 0) {
1043                log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1044                sock_release(sock);
1045                sock = NULL;
1046                goto create_out;
1047        }
1048
1049create_out:
1050        return sock;
1051}
1052
1053/* Get local addresses */
1054static void init_local(void)
1055{
1056        struct sockaddr_storage sas, *addr;
1057        int i;
1058
1059        dlm_local_count = 0;
1060        for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
1061                if (dlm_our_addr(&sas, i))
1062                        break;
1063
1064                addr = kmalloc(sizeof(*addr), GFP_NOFS);
1065                if (!addr)
1066                        break;
1067                memcpy(addr, &sas, sizeof(*addr));
1068                dlm_local_addr[dlm_local_count++] = addr;
1069        }
1070}
1071
1072/* Bind to an IP address. SCTP allows multiple address so it can do
1073   multi-homing */
1074static int add_sctp_bind_addr(struct connection *sctp_con,
1075                              struct sockaddr_storage *addr,
1076                              int addr_len, int num)
1077{
1078        int result = 0;
1079
1080        if (num == 1)
1081                result = kernel_bind(sctp_con->sock,
1082                                     (struct sockaddr *) addr,
1083                                     addr_len);
1084        else
1085                result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1086                                           SCTP_SOCKOPT_BINDX_ADD,
1087                                           (char *)addr, addr_len);
1088
1089        if (result < 0)
1090                log_print("Can't bind to port %d addr number %d",
1091                          dlm_config.ci_tcp_port, num);
1092
1093        return result;
1094}
1095
1096/* Initialise SCTP socket and bind to all interfaces */
1097static int sctp_listen_for_all(void)
1098{
1099        struct socket *sock = NULL;
1100        struct sockaddr_storage localaddr;
1101        struct sctp_event_subscribe subscribe;
1102        int result = -EINVAL, num = 1, i, addr_len;
1103        struct connection *con = nodeid2con(0, GFP_NOFS);
1104        int bufsize = NEEDED_RMEM;
1105
1106        if (!con)
1107                return -ENOMEM;
1108
1109        log_print("Using SCTP for communications");
1110
1111        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1112                                  IPPROTO_SCTP, &sock);
1113        if (result < 0) {
1114                log_print("Can't create comms socket, check SCTP is loaded");
1115                goto out;
1116        }
1117
1118        /* Listen for events */
1119        memset(&subscribe, 0, sizeof(subscribe));
1120        subscribe.sctp_data_io_event = 1;
1121        subscribe.sctp_association_event = 1;
1122        subscribe.sctp_send_failure_event = 1;
1123        subscribe.sctp_shutdown_event = 1;
1124        subscribe.sctp_partial_delivery_event = 1;
1125
1126        result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1127                                 (char *)&bufsize, sizeof(bufsize));
1128        if (result)
1129                log_print("Error increasing buffer space on socket %d", result);
1130
1131        result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1132                                   (char *)&subscribe, sizeof(subscribe));
1133        if (result < 0) {
1134                log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1135                          result);
1136                goto create_delsock;
1137        }
1138
1139        /* Init con struct */
1140        sock->sk->sk_user_data = con;
1141        con->sock = sock;
1142        con->sock->sk->sk_data_ready = lowcomms_data_ready;
1143        con->rx_action = receive_from_sock;
1144        con->connect_action = sctp_init_assoc;
1145
1146        /* Bind to all interfaces. */
1147        for (i = 0; i < dlm_local_count; i++) {
1148                memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1149                make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1150
1151                result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1152                if (result)
1153                        goto create_delsock;
1154                ++num;
1155        }
1156
1157        result = sock->ops->listen(sock, 5);
1158        if (result < 0) {
1159                log_print("Can't set socket listening");
1160                goto create_delsock;
1161        }
1162
1163        return 0;
1164
1165create_delsock:
1166        sock_release(sock);
1167        con->sock = NULL;
1168out:
1169        return result;
1170}
1171
1172static int tcp_listen_for_all(void)
1173{
1174        struct socket *sock = NULL;
1175        struct connection *con = nodeid2con(0, GFP_NOFS);
1176        int result = -EINVAL;
1177
1178        if (!con)
1179                return -ENOMEM;
1180
1181        /* We don't support multi-homed hosts */
1182        if (dlm_local_addr[1] != NULL) {
1183                log_print("TCP protocol can't handle multi-homed hosts, "
1184                          "try SCTP");
1185                return -EINVAL;
1186        }
1187
1188        log_print("Using TCP for communications");
1189
1190        sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1191        if (sock) {
1192                add_sock(sock, con);
1193                result = 0;
1194        }
1195        else {
1196                result = -EADDRINUSE;
1197        }
1198
1199        return result;
1200}
1201
1202
1203
1204static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1205                                                     gfp_t allocation)
1206{
1207        struct writequeue_entry *entry;
1208
1209        entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1210        if (!entry)
1211                return NULL;
1212
1213        entry->page = alloc_page(allocation);
1214        if (!entry->page) {
1215                kfree(entry);
1216                return NULL;
1217        }
1218
1219        entry->offset = 0;
1220        entry->len = 0;
1221        entry->end = 0;
1222        entry->users = 0;
1223        entry->con = con;
1224
1225        return entry;
1226}
1227
1228void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1229{
1230        struct connection *con;
1231        struct writequeue_entry *e;
1232        int offset = 0;
1233        int users = 0;
1234
1235        con = nodeid2con(nodeid, allocation);
1236        if (!con)
1237                return NULL;
1238
1239        spin_lock(&con->writequeue_lock);
1240        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1241        if ((&e->list == &con->writequeue) ||
1242            (PAGE_CACHE_SIZE - e->end < len)) {
1243                e = NULL;
1244        } else {
1245                offset = e->end;
1246                e->end += len;
1247                users = e->users++;
1248        }
1249        spin_unlock(&con->writequeue_lock);
1250
1251        if (e) {
1252        got_one:
1253                *ppc = page_address(e->page) + offset;
1254                return e;
1255        }
1256
1257        e = new_writequeue_entry(con, allocation);
1258        if (e) {
1259                spin_lock(&con->writequeue_lock);
1260                offset = e->end;
1261                e->end += len;
1262                users = e->users++;
1263                list_add_tail(&e->list, &con->writequeue);
1264                spin_unlock(&con->writequeue_lock);
1265                goto got_one;
1266        }
1267        return NULL;
1268}
1269
1270void dlm_lowcomms_commit_buffer(void *mh)
1271{
1272        struct writequeue_entry *e = (struct writequeue_entry *)mh;
1273        struct connection *con = e->con;
1274        int users;
1275
1276        spin_lock(&con->writequeue_lock);
1277        users = --e->users;
1278        if (users)
1279                goto out;
1280        e->len = e->end - e->offset;
1281        spin_unlock(&con->writequeue_lock);
1282
1283        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1284                queue_work(send_workqueue, &con->swork);
1285        }
1286        return;
1287
1288out:
1289        spin_unlock(&con->writequeue_lock);
1290        return;
1291}
1292
1293/* Send a message */
1294static void send_to_sock(struct connection *con)
1295{
1296        int ret = 0;
1297        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1298        struct writequeue_entry *e;
1299        int len, offset;
1300
1301        mutex_lock(&con->sock_mutex);
1302        if (con->sock == NULL)
1303                goto out_connect;
1304
1305        spin_lock(&con->writequeue_lock);
1306        for (;;) {
1307                e = list_entry(con->writequeue.next, struct writequeue_entry,
1308                               list);
1309                if ((struct list_head *) e == &con->writequeue)
1310                        break;
1311
1312                len = e->len;
1313                offset = e->offset;
1314                BUG_ON(len == 0 && e->users == 0);
1315                spin_unlock(&con->writequeue_lock);
1316
1317                ret = 0;
1318                if (len) {
1319                        ret = kernel_sendpage(con->sock, e->page, offset, len,
1320                                              msg_flags);
1321                        if (ret == -EAGAIN || ret == 0) {
1322                                cond_resched();
1323                                goto out;
1324                        }
1325                        if (ret <= 0)
1326                                goto send_error;
1327                }
1328                        /* Don't starve people filling buffers */
1329                        cond_resched();
1330
1331                spin_lock(&con->writequeue_lock);
1332                e->offset += ret;
1333                e->len -= ret;
1334
1335                if (e->len == 0 && e->users == 0) {
1336                        list_del(&e->list);
1337                        free_entry(e);
1338                        continue;
1339                }
1340        }
1341        spin_unlock(&con->writequeue_lock);
1342out:
1343        mutex_unlock(&con->sock_mutex);
1344        return;
1345
1346send_error:
1347        mutex_unlock(&con->sock_mutex);
1348        close_connection(con, false);
1349        lowcomms_connect_sock(con);
1350        return;
1351
1352out_connect:
1353        mutex_unlock(&con->sock_mutex);
1354        if (!test_bit(CF_INIT_PENDING, &con->flags))
1355                lowcomms_connect_sock(con);
1356        return;
1357}
1358
1359static void clean_one_writequeue(struct connection *con)
1360{
1361        struct writequeue_entry *e, *safe;
1362
1363        spin_lock(&con->writequeue_lock);
1364        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1365                list_del(&e->list);
1366                free_entry(e);
1367        }
1368        spin_unlock(&con->writequeue_lock);
1369}
1370
1371/* Called from recovery when it knows that a node has
1372   left the cluster */
1373int dlm_lowcomms_close(int nodeid)
1374{
1375        struct connection *con;
1376
1377        log_print("closing connection to node %d", nodeid);
1378        con = nodeid2con(nodeid, 0);
1379        if (con) {
1380                clear_bit(CF_CONNECT_PENDING, &con->flags);
1381                clear_bit(CF_WRITE_PENDING, &con->flags);
1382                set_bit(CF_CLOSE, &con->flags);
1383                if (cancel_work_sync(&con->swork))
1384                        log_print("canceled swork for node %d", nodeid);
1385                if (cancel_work_sync(&con->rwork))
1386                        log_print("canceled rwork for node %d", nodeid);
1387                clean_one_writequeue(con);
1388                close_connection(con, true);
1389        }
1390        return 0;
1391}
1392
1393/* Receive workqueue function */
1394static void process_recv_sockets(struct work_struct *work)
1395{
1396        struct connection *con = container_of(work, struct connection, rwork);
1397        int err;
1398
1399        clear_bit(CF_READ_PENDING, &con->flags);
1400        do {
1401                err = con->rx_action(con);
1402        } while (!err);
1403}
1404
1405/* Send workqueue function */
1406static void process_send_sockets(struct work_struct *work)
1407{
1408        struct connection *con = container_of(work, struct connection, swork);
1409
1410        if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1411                con->connect_action(con);
1412                set_bit(CF_WRITE_PENDING, &con->flags);
1413        }
1414        if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1415                send_to_sock(con);
1416}
1417
1418
1419/* Discard all entries on the write queues */
1420static void clean_writequeues(void)
1421{
1422        foreach_conn(clean_one_writequeue);
1423}
1424
1425static void work_stop(void)
1426{
1427        destroy_workqueue(recv_workqueue);
1428        destroy_workqueue(send_workqueue);
1429}
1430
1431static int work_start(void)
1432{
1433        int error;
1434        recv_workqueue = create_workqueue("dlm_recv");
1435        error = IS_ERR(recv_workqueue);
1436        if (error) {
1437                log_print("can't start dlm_recv %d", error);
1438                return error;
1439        }
1440
1441        send_workqueue = create_singlethread_workqueue("dlm_send");
1442        error = IS_ERR(send_workqueue);
1443        if (error) {
1444                log_print("can't start dlm_send %d", error);
1445                destroy_workqueue(recv_workqueue);
1446                return error;
1447        }
1448
1449        return 0;
1450}
1451
1452static void stop_conn(struct connection *con)
1453{
1454        con->flags |= 0x0F;
1455        if (con->sock && con->sock->sk)
1456                con->sock->sk->sk_user_data = NULL;
1457}
1458
1459static void free_conn(struct connection *con)
1460{
1461        close_connection(con, true);
1462        if (con->othercon)
1463                kmem_cache_free(con_cache, con->othercon);
1464        hlist_del(&con->list);
1465        kmem_cache_free(con_cache, con);
1466}
1467
1468void dlm_lowcomms_stop(void)
1469{
1470        /* Set all the flags to prevent any
1471           socket activity.
1472        */
1473        mutex_lock(&connections_lock);
1474        foreach_conn(stop_conn);
1475        mutex_unlock(&connections_lock);
1476
1477        work_stop();
1478
1479        mutex_lock(&connections_lock);
1480        clean_writequeues();
1481
1482        foreach_conn(free_conn);
1483
1484        mutex_unlock(&connections_lock);
1485        kmem_cache_destroy(con_cache);
1486}
1487
1488int dlm_lowcomms_start(void)
1489{
1490        int error = -EINVAL;
1491        struct connection *con;
1492        int i;
1493
1494        for (i = 0; i < CONN_HASH_SIZE; i++)
1495                INIT_HLIST_HEAD(&connection_hash[i]);
1496
1497        init_local();
1498        if (!dlm_local_count) {
1499                error = -ENOTCONN;
1500                log_print("no local IP address has been set");
1501                goto out;
1502        }
1503
1504        error = -ENOMEM;
1505        con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1506                                      __alignof__(struct connection), 0,
1507                                      NULL);
1508        if (!con_cache)
1509                goto out;
1510
1511        /* Start listening */
1512        if (dlm_config.ci_protocol == 0)
1513                error = tcp_listen_for_all();
1514        else
1515                error = sctp_listen_for_all();
1516        if (error)
1517                goto fail_unlisten;
1518
1519        error = work_start();
1520        if (error)
1521                goto fail_unlisten;
1522
1523        return 0;
1524
1525fail_unlisten:
1526        con = nodeid2con(0,0);
1527        if (con) {
1528                close_connection(con, false);
1529                kmem_cache_free(con_cache, con);
1530        }
1531        kmem_cache_destroy(con_cache);
1532
1533out:
1534        return error;
1535}
1536
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.