linux/net/rds/ib_cm.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/in.h>
  35#include <linux/vmalloc.h>
  36
  37#include "rds.h"
  38#include "ib.h"
  39
  40/*
  41 * Set the selected protocol version
  42 */
  43static void rds_ib_set_protocol(struct rds_connection *conn, unsigned int version)
  44{
  45        conn->c_version = version;
  46}
  47
  48/*
  49 * Set up flow control
  50 */
  51static void rds_ib_set_flow_control(struct rds_connection *conn, u32 credits)
  52{
  53        struct rds_ib_connection *ic = conn->c_transport_data;
  54
  55        if (rds_ib_sysctl_flow_control && credits != 0) {
  56                /* We're doing flow control */
  57                ic->i_flowctl = 1;
  58                rds_ib_send_add_credits(conn, credits);
  59        } else {
  60                ic->i_flowctl = 0;
  61        }
  62}
  63
  64/*
  65 * Tune RNR behavior. Without flow control, we use a rather
  66 * low timeout, but not the absolute minimum - this should
  67 * be tunable.
  68 *
  69 * We already set the RNR retry count to 7 (which is the
  70 * smallest infinite number :-) above.
  71 * If flow control is off, we want to change this back to 0
  72 * so that we learn quickly when our credit accounting is
  73 * buggy.
  74 *
  75 * Caller passes in a qp_attr pointer - don't waste stack spacv
  76 * by allocation this twice.
  77 */
  78static void
  79rds_ib_tune_rnr(struct rds_ib_connection *ic, struct ib_qp_attr *attr)
  80{
  81        int ret;
  82
  83        attr->min_rnr_timer = IB_RNR_TIMER_000_32;
  84        ret = ib_modify_qp(ic->i_cm_id->qp, attr, IB_QP_MIN_RNR_TIMER);
  85        if (ret)
  86                printk(KERN_NOTICE "ib_modify_qp(IB_QP_MIN_RNR_TIMER): err=%d\n", -ret);
  87}
  88
  89/*
  90 * Connection established.
  91 * We get here for both outgoing and incoming connection.
  92 */
  93void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_event *event)
  94{
  95        const struct rds_ib_connect_private *dp = NULL;
  96        struct rds_ib_connection *ic = conn->c_transport_data;
  97        struct rds_ib_device *rds_ibdev;
  98        struct ib_qp_attr qp_attr;
  99        int err;
 100
 101        if (event->param.conn.private_data_len) {
 102                dp = event->param.conn.private_data;
 103
 104                rds_ib_set_protocol(conn,
 105                                RDS_PROTOCOL(dp->dp_protocol_major,
 106                                        dp->dp_protocol_minor));
 107                rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
 108        }
 109
 110        printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n",
 111                        &conn->c_laddr,
 112                        RDS_PROTOCOL_MAJOR(conn->c_version),
 113                        RDS_PROTOCOL_MINOR(conn->c_version),
 114                        ic->i_flowctl ? ", flow control" : "");
 115
 116        /* Tune RNR behavior */
 117        rds_ib_tune_rnr(ic, &qp_attr);
 118
 119        qp_attr.qp_state = IB_QPS_RTS;
 120        err = ib_modify_qp(ic->i_cm_id->qp, &qp_attr, IB_QP_STATE);
 121        if (err)
 122                printk(KERN_NOTICE "ib_modify_qp(IB_QP_STATE, RTS): err=%d\n", err);
 123
 124        /* update ib_device with this local ipaddr & conn */
 125        rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 126        err = rds_ib_update_ipaddr(rds_ibdev, conn->c_laddr);
 127        if (err)
 128                printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n", err);
 129        rds_ib_add_conn(rds_ibdev, conn);
 130
 131        /* If the peer gave us the last packet it saw, process this as if
 132         * we had received a regular ACK. */
 133        if (dp && dp->dp_ack_seq)
 134                rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
 135
 136        rds_connect_complete(conn);
 137}
 138
 139static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
 140                        struct rdma_conn_param *conn_param,
 141                        struct rds_ib_connect_private *dp,
 142                        u32 protocol_version)
 143{
 144        memset(conn_param, 0, sizeof(struct rdma_conn_param));
 145        /* XXX tune these? */
 146        conn_param->responder_resources = 1;
 147        conn_param->initiator_depth = 1;
 148        conn_param->retry_count = 7;
 149        conn_param->rnr_retry_count = 7;
 150
 151        if (dp) {
 152                struct rds_ib_connection *ic = conn->c_transport_data;
 153
 154                memset(dp, 0, sizeof(*dp));
 155                dp->dp_saddr = conn->c_laddr;
 156                dp->dp_daddr = conn->c_faddr;
 157                dp->dp_protocol_major = RDS_PROTOCOL_MAJOR(protocol_version);
 158                dp->dp_protocol_minor = RDS_PROTOCOL_MINOR(protocol_version);
 159                dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IB_SUPPORTED_PROTOCOLS);
 160                dp->dp_ack_seq = rds_ib_piggyb_ack(ic);
 161
 162                /* Advertise flow control */
 163                if (ic->i_flowctl) {
 164                        unsigned int credits;
 165
 166                        credits = IB_GET_POST_CREDITS(atomic_read(&ic->i_credits));
 167                        dp->dp_credit = cpu_to_be32(credits);
 168                        atomic_sub(IB_SET_POST_CREDITS(credits), &ic->i_credits);
 169                }
 170
 171                conn_param->private_data = dp;
 172                conn_param->private_data_len = sizeof(*dp);
 173        }
 174}
 175
 176static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
 177{
 178        rdsdebug("event %u data %p\n", event->event, data);
 179}
 180
 181static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 182{
 183        struct rds_connection *conn = data;
 184        struct rds_ib_connection *ic = conn->c_transport_data;
 185
 186        rdsdebug("conn %p ic %p event %u\n", conn, ic, event->event);
 187
 188        switch (event->event) {
 189        case IB_EVENT_COMM_EST:
 190                rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
 191                break;
 192        default:
 193                printk(KERN_WARNING "RDS/ib: unhandled QP event %u "
 194                       "on connection to %pI4\n", event->event,
 195                       &conn->c_faddr);
 196                break;
 197        }
 198}
 199
 200/*
 201 * This needs to be very careful to not leave IS_ERR pointers around for
 202 * cleanup to trip over.
 203 */
 204static int rds_ib_setup_qp(struct rds_connection *conn)
 205{
 206        struct rds_ib_connection *ic = conn->c_transport_data;
 207        struct ib_device *dev = ic->i_cm_id->device;
 208        struct ib_qp_init_attr attr;
 209        struct rds_ib_device *rds_ibdev;
 210        int ret;
 211
 212        /* rds_ib_add_one creates a rds_ib_device object per IB device,
 213         * and allocates a protection domain, memory range and FMR pool
 214         * for each.  If that fails for any reason, it will not register
 215         * the rds_ibdev at all.
 216         */
 217        rds_ibdev = ib_get_client_data(dev, &rds_ib_client);
 218        if (rds_ibdev == NULL) {
 219                if (printk_ratelimit())
 220                        printk(KERN_NOTICE "RDS/IB: No client_data for device %s\n",
 221                                        dev->name);
 222                return -EOPNOTSUPP;
 223        }
 224
 225        if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
 226                rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
 227        if (rds_ibdev->max_wrs < ic->i_recv_ring.w_nr + 1)
 228                rds_ib_ring_resize(&ic->i_recv_ring, rds_ibdev->max_wrs - 1);
 229
 230        /* Protection domain and memory range */
 231        ic->i_pd = rds_ibdev->pd;
 232        ic->i_mr = rds_ibdev->mr;
 233
 234        ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
 235                                     rds_ib_cq_event_handler, conn,
 236                                     ic->i_send_ring.w_nr + 1, 0);
 237        if (IS_ERR(ic->i_send_cq)) {
 238                ret = PTR_ERR(ic->i_send_cq);
 239                ic->i_send_cq = NULL;
 240                rdsdebug("ib_create_cq send failed: %d\n", ret);
 241                goto out;
 242        }
 243
 244        ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler,
 245                                     rds_ib_cq_event_handler, conn,
 246                                     ic->i_recv_ring.w_nr, 0);
 247        if (IS_ERR(ic->i_recv_cq)) {
 248                ret = PTR_ERR(ic->i_recv_cq);
 249                ic->i_recv_cq = NULL;
 250                rdsdebug("ib_create_cq recv failed: %d\n", ret);
 251                goto out;
 252        }
 253
 254        ret = ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
 255        if (ret) {
 256                rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
 257                goto out;
 258        }
 259
 260        ret = ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
 261        if (ret) {
 262                rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
 263                goto out;
 264        }
 265
 266        /* XXX negotiate max send/recv with remote? */
 267        memset(&attr, 0, sizeof(attr));
 268        attr.event_handler = rds_ib_qp_event_handler;
 269        attr.qp_context = conn;
 270        /* + 1 to allow for the single ack message */
 271        attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1;
 272        attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
 273        attr.cap.max_send_sge = rds_ibdev->max_sge;
 274        attr.cap.max_recv_sge = RDS_IB_RECV_SGE;
 275        attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 276        attr.qp_type = IB_QPT_RC;
 277        attr.send_cq = ic->i_send_cq;
 278        attr.recv_cq = ic->i_recv_cq;
 279
 280        /*
 281         * XXX this can fail if max_*_wr is too large?  Are we supposed
 282         * to back off until we get a value that the hardware can support?
 283         */
 284        ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
 285        if (ret) {
 286                rdsdebug("rdma_create_qp failed: %d\n", ret);
 287                goto out;
 288        }
 289
 290        ic->i_send_hdrs = ib_dma_alloc_coherent(dev,
 291                                           ic->i_send_ring.w_nr *
 292                                                sizeof(struct rds_header),
 293                                           &ic->i_send_hdrs_dma, GFP_KERNEL);
 294        if (ic->i_send_hdrs == NULL) {
 295                ret = -ENOMEM;
 296                rdsdebug("ib_dma_alloc_coherent send failed\n");
 297                goto out;
 298        }
 299
 300        ic->i_recv_hdrs = ib_dma_alloc_coherent(dev,
 301                                           ic->i_recv_ring.w_nr *
 302                                                sizeof(struct rds_header),
 303                                           &ic->i_recv_hdrs_dma, GFP_KERNEL);
 304        if (ic->i_recv_hdrs == NULL) {
 305                ret = -ENOMEM;
 306                rdsdebug("ib_dma_alloc_coherent recv failed\n");
 307                goto out;
 308        }
 309
 310        ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header),
 311                                       &ic->i_ack_dma, GFP_KERNEL);
 312        if (ic->i_ack == NULL) {
 313                ret = -ENOMEM;
 314                rdsdebug("ib_dma_alloc_coherent ack failed\n");
 315                goto out;
 316        }
 317
 318        ic->i_sends = vmalloc(ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work));
 319        if (ic->i_sends == NULL) {
 320                ret = -ENOMEM;
 321                rdsdebug("send allocation failed\n");
 322                goto out;
 323        }
 324        rds_ib_send_init_ring(ic);
 325
 326        ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work));
 327        if (ic->i_recvs == NULL) {
 328                ret = -ENOMEM;
 329                rdsdebug("recv allocation failed\n");
 330                goto out;
 331        }
 332
 333        rds_ib_recv_init_ring(ic);
 334        rds_ib_recv_init_ack(ic);
 335
 336        /* Post receive buffers - as a side effect, this will update
 337         * the posted credit count. */
 338        rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
 339
 340        rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr,
 341                 ic->i_send_cq, ic->i_recv_cq);
 342
 343out:
 344        return ret;
 345}
 346
 347static u32 rds_ib_protocol_compatible(const struct rds_ib_connect_private *dp)
 348{
 349        u16 common;
 350        u32 version = 0;
 351
 352        /* rdma_cm private data is odd - when there is any private data in the
 353         * request, we will be given a pretty large buffer without telling us the
 354         * original size. The only way to tell the difference is by looking at
 355         * the contents, which are initialized to zero.
 356         * If the protocol version fields aren't set, this is a connection attempt
 357         * from an older version. This could could be 3.0 or 2.0 - we can't tell.
 358         * We really should have changed this for OFED 1.3 :-( */
 359        if (dp->dp_protocol_major == 0)
 360                return RDS_PROTOCOL_3_0;
 361
 362        common = be16_to_cpu(dp->dp_protocol_minor_mask) & RDS_IB_SUPPORTED_PROTOCOLS;
 363        if (dp->dp_protocol_major == 3 && common) {
 364                version = RDS_PROTOCOL_3_0;
 365                while ((common >>= 1) != 0)
 366                        version++;
 367        } else if (printk_ratelimit()) {
 368                printk(KERN_NOTICE "RDS: Connection from %pI4 using "
 369                        "incompatible protocol version %u.%u\n",
 370                        &dp->dp_saddr,
 371                        dp->dp_protocol_major,
 372                        dp->dp_protocol_minor);
 373        }
 374        return version;
 375}
 376
 377int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
 378                                    struct rdma_cm_event *event)
 379{
 380        __be64 lguid = cm_id->route.path_rec->sgid.global.interface_id;
 381        __be64 fguid = cm_id->route.path_rec->dgid.global.interface_id;
 382        const struct rds_ib_connect_private *dp = event->param.conn.private_data;
 383        struct rds_ib_connect_private dp_rep;
 384        struct rds_connection *conn = NULL;
 385        struct rds_ib_connection *ic = NULL;
 386        struct rdma_conn_param conn_param;
 387        u32 version;
 388        int err, destroy = 1;
 389
 390        /* Check whether the remote protocol version matches ours. */
 391        version = rds_ib_protocol_compatible(dp);
 392        if (!version)
 393                goto out;
 394
 395        rdsdebug("saddr %pI4 daddr %pI4 RDSv%u.%u lguid 0x%llx fguid "
 396                 "0x%llx\n", &dp->dp_saddr, &dp->dp_daddr,
 397                 RDS_PROTOCOL_MAJOR(version), RDS_PROTOCOL_MINOR(version),
 398                 (unsigned long long)be64_to_cpu(lguid),
 399                 (unsigned long long)be64_to_cpu(fguid));
 400
 401        conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_ib_transport,
 402                               GFP_KERNEL);
 403        if (IS_ERR(conn)) {
 404                rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn));
 405                conn = NULL;
 406                goto out;
 407        }
 408
 409        /*
 410         * The connection request may occur while the
 411         * previous connection exist, e.g. in case of failover.
 412         * But as connections may be initiated simultaneously
 413         * by both hosts, we have a random backoff mechanism -
 414         * see the comment above rds_queue_reconnect()
 415         */
 416        mutex_lock(&conn->c_cm_lock);
 417        if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
 418                if (rds_conn_state(conn) == RDS_CONN_UP) {
 419                        rdsdebug("incoming connect while connecting\n");
 420                        rds_conn_drop(conn);
 421                        rds_ib_stats_inc(s_ib_listen_closed_stale);
 422                } else
 423                if (rds_conn_state(conn) == RDS_CONN_CONNECTING) {
 424                        /* Wait and see - our connect may still be succeeding */
 425                        rds_ib_stats_inc(s_ib_connect_raced);
 426                }
 427                mutex_unlock(&conn->c_cm_lock);
 428                goto out;
 429        }
 430
 431        ic = conn->c_transport_data;
 432
 433        rds_ib_set_protocol(conn, version);
 434        rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
 435
 436        /* If the peer gave us the last packet it saw, process this as if
 437         * we had received a regular ACK. */
 438        if (dp->dp_ack_seq)
 439                rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
 440
 441        BUG_ON(cm_id->context);
 442        BUG_ON(ic->i_cm_id);
 443
 444        ic->i_cm_id = cm_id;
 445        cm_id->context = conn;
 446
 447        /* We got halfway through setting up the ib_connection, if we
 448         * fail now, we have to take the long route out of this mess. */
 449        destroy = 0;
 450
 451        err = rds_ib_setup_qp(conn);
 452        if (err) {
 453                rds_ib_conn_error(conn, "rds_ib_setup_qp failed (%d)\n", err);
 454                goto out;
 455        }
 456
 457        rds_ib_cm_fill_conn_param(conn, &conn_param, &dp_rep, version);
 458
 459        /* rdma_accept() calls rdma_reject() internally if it fails */
 460        err = rdma_accept(cm_id, &conn_param);
 461        mutex_unlock(&conn->c_cm_lock);
 462        if (err) {
 463                rds_ib_conn_error(conn, "rdma_accept failed (%d)\n", err);
 464                goto out;
 465        }
 466
 467        return 0;
 468
 469out:
 470        rdma_reject(cm_id, NULL, 0);
 471        return destroy;
 472}
 473
 474
 475int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
 476{
 477        struct rds_connection *conn = cm_id->context;
 478        struct rds_ib_connection *ic = conn->c_transport_data;
 479        struct rdma_conn_param conn_param;
 480        struct rds_ib_connect_private dp;
 481        int ret;
 482
 483        /* If the peer doesn't do protocol negotiation, we must
 484         * default to RDSv3.0 */
 485        rds_ib_set_protocol(conn, RDS_PROTOCOL_3_0);
 486        ic->i_flowctl = rds_ib_sysctl_flow_control;     /* advertise flow control */
 487
 488        ret = rds_ib_setup_qp(conn);
 489        if (ret) {
 490                rds_ib_conn_error(conn, "rds_ib_setup_qp failed (%d)\n", ret);
 491                goto out;
 492        }
 493
 494        rds_ib_cm_fill_conn_param(conn, &conn_param, &dp, RDS_PROTOCOL_VERSION);
 495
 496        ret = rdma_connect(cm_id, &conn_param);
 497        if (ret)
 498                rds_ib_conn_error(conn, "rdma_connect failed (%d)\n", ret);
 499
 500out:
 501        /* Beware - returning non-zero tells the rdma_cm to destroy
 502         * the cm_id. We should certainly not do it as long as we still
 503         * "own" the cm_id. */
 504        if (ret) {
 505                if (ic->i_cm_id == cm_id)
 506                        ret = 0;
 507        }
 508        return ret;
 509}
 510
 511int rds_ib_conn_connect(struct rds_connection *conn)
 512{
 513        struct rds_ib_connection *ic = conn->c_transport_data;
 514        struct sockaddr_in src, dest;
 515        int ret;
 516
 517        /* XXX I wonder what affect the port space has */
 518        /* delegate cm event handler to rdma_transport */
 519        ic->i_cm_id = rdma_create_id(rds_rdma_cm_event_handler, conn,
 520                                     RDMA_PS_TCP);
 521        if (IS_ERR(ic->i_cm_id)) {
 522                ret = PTR_ERR(ic->i_cm_id);
 523                ic->i_cm_id = NULL;
 524                rdsdebug("rdma_create_id() failed: %d\n", ret);
 525                goto out;
 526        }
 527
 528        rdsdebug("created cm id %p for conn %p\n", ic->i_cm_id, conn);
 529
 530        src.sin_family = AF_INET;
 531        src.sin_addr.s_addr = (__force u32)conn->c_laddr;
 532        src.sin_port = (__force u16)htons(0);
 533
 534        dest.sin_family = AF_INET;
 535        dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
 536        dest.sin_port = (__force u16)htons(RDS_PORT);
 537
 538        ret = rdma_resolve_addr(ic->i_cm_id, (struct sockaddr *)&src,
 539                                (struct sockaddr *)&dest,
 540                                RDS_RDMA_RESOLVE_TIMEOUT_MS);
 541        if (ret) {
 542                rdsdebug("addr resolve failed for cm id %p: %d\n", ic->i_cm_id,
 543                         ret);
 544                rdma_destroy_id(ic->i_cm_id);
 545                ic->i_cm_id = NULL;
 546        }
 547
 548out:
 549        return ret;
 550}
 551
 552/*
 553 * This is so careful about only cleaning up resources that were built up
 554 * so that it can be called at any point during startup.  In fact it
 555 * can be called multiple times for a given connection.
 556 */
 557void rds_ib_conn_shutdown(struct rds_connection *conn)
 558{
 559        struct rds_ib_connection *ic = conn->c_transport_data;
 560        int err = 0;
 561
 562        rdsdebug("cm %p pd %p cq %p %p qp %p\n", ic->i_cm_id,
 563                 ic->i_pd, ic->i_send_cq, ic->i_recv_cq,
 564                 ic->i_cm_id ? ic->i_cm_id->qp : NULL);
 565
 566        if (ic->i_cm_id) {
 567                struct ib_device *dev = ic->i_cm_id->device;
 568
 569                rdsdebug("disconnecting cm %p\n", ic->i_cm_id);
 570                err = rdma_disconnect(ic->i_cm_id);
 571                if (err) {
 572                        /* Actually this may happen quite frequently, when
 573                         * an outgoing connect raced with an incoming connect.
 574                         */
 575                        rdsdebug("failed to disconnect, cm: %p err %d\n",
 576                                ic->i_cm_id, err);
 577                }
 578
 579                wait_event(rds_ib_ring_empty_wait,
 580                        rds_ib_ring_empty(&ic->i_send_ring) &&
 581                        rds_ib_ring_empty(&ic->i_recv_ring));
 582
 583                if (ic->i_send_hdrs)
 584                        ib_dma_free_coherent(dev,
 585                                           ic->i_send_ring.w_nr *
 586                                                sizeof(struct rds_header),
 587                                           ic->i_send_hdrs,
 588                                           ic->i_send_hdrs_dma);
 589
 590                if (ic->i_recv_hdrs)
 591                        ib_dma_free_coherent(dev,
 592                                           ic->i_recv_ring.w_nr *
 593                                                sizeof(struct rds_header),
 594                                           ic->i_recv_hdrs,
 595                                           ic->i_recv_hdrs_dma);
 596
 597                if (ic->i_ack)
 598                        ib_dma_free_coherent(dev, sizeof(struct rds_header),
 599                                             ic->i_ack, ic->i_ack_dma);
 600
 601                if (ic->i_sends)
 602                        rds_ib_send_clear_ring(ic);
 603                if (ic->i_recvs)
 604                        rds_ib_recv_clear_ring(ic);
 605
 606                if (ic->i_cm_id->qp)
 607                        rdma_destroy_qp(ic->i_cm_id);
 608                if (ic->i_send_cq)
 609                        ib_destroy_cq(ic->i_send_cq);
 610                if (ic->i_recv_cq)
 611                        ib_destroy_cq(ic->i_recv_cq);
 612                rdma_destroy_id(ic->i_cm_id);
 613
 614                /*
 615                 * Move connection back to the nodev list.
 616                 */
 617                if (ic->rds_ibdev)
 618                        rds_ib_remove_conn(ic->rds_ibdev, conn);
 619
 620                ic->i_cm_id = NULL;
 621                ic->i_pd = NULL;
 622                ic->i_mr = NULL;
 623                ic->i_send_cq = NULL;
 624                ic->i_recv_cq = NULL;
 625                ic->i_send_hdrs = NULL;
 626                ic->i_recv_hdrs = NULL;
 627                ic->i_ack = NULL;
 628        }
 629        BUG_ON(ic->rds_ibdev);
 630
 631        /* Clear pending transmit */
 632        if (ic->i_rm) {
 633                rds_message_put(ic->i_rm);
 634                ic->i_rm = NULL;
 635        }
 636
 637        /* Clear the ACK state */
 638        clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
 639#ifdef KERNEL_HAS_ATOMIC64
 640        atomic64_set(&ic->i_ack_next, 0);
 641#else
 642        ic->i_ack_next = 0;
 643#endif
 644        ic->i_ack_recv = 0;
 645
 646        /* Clear flow control state */
 647        ic->i_flowctl = 0;
 648        atomic_set(&ic->i_credits, 0);
 649
 650        rds_ib_ring_init(&ic->i_send_ring, rds_ib_sysctl_max_send_wr);
 651        rds_ib_ring_init(&ic->i_recv_ring, rds_ib_sysctl_max_recv_wr);
 652
 653        if (ic->i_ibinc) {
 654                rds_inc_put(&ic->i_ibinc->ii_inc);
 655                ic->i_ibinc = NULL;
 656        }
 657
 658        vfree(ic->i_sends);
 659        ic->i_sends = NULL;
 660        vfree(ic->i_recvs);
 661        ic->i_recvs = NULL;
 662}
 663
 664int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 665{
 666        struct rds_ib_connection *ic;
 667        unsigned long flags;
 668
 669        /* XXX too lazy? */
 670        ic = kzalloc(sizeof(struct rds_ib_connection), GFP_KERNEL);
 671        if (ic == NULL)
 672                return -ENOMEM;
 673
 674        INIT_LIST_HEAD(&ic->ib_node);
 675        mutex_init(&ic->i_recv_mutex);
 676#ifndef KERNEL_HAS_ATOMIC64
 677        spin_lock_init(&ic->i_ack_lock);
 678#endif
 679
 680        /*
 681         * rds_ib_conn_shutdown() waits for these to be emptied so they
 682         * must be initialized before it can be called.
 683         */
 684        rds_ib_ring_init(&ic->i_send_ring, rds_ib_sysctl_max_send_wr);
 685        rds_ib_ring_init(&ic->i_recv_ring, rds_ib_sysctl_max_recv_wr);
 686
 687        ic->conn = conn;
 688        conn->c_transport_data = ic;
 689
 690        spin_lock_irqsave(&ib_nodev_conns_lock, flags);
 691        list_add_tail(&ic->ib_node, &ib_nodev_conns);
 692        spin_unlock_irqrestore(&ib_nodev_conns_lock, flags);
 693
 694
 695        rdsdebug("conn %p conn ic %p\n", conn, conn->c_transport_data);
 696        return 0;
 697}
 698
 699/*
 700 * Free a connection. Connection must be shut down and not set for reconnect.
 701 */
 702void rds_ib_conn_free(void *arg)
 703{
 704        struct rds_ib_connection *ic = arg;
 705        spinlock_t      *lock_ptr;
 706
 707        rdsdebug("ic %p\n", ic);
 708
 709        /*
 710         * Conn is either on a dev's list or on the nodev list.
 711         * A race with shutdown() or connect() would cause problems
 712         * (since rds_ibdev would change) but that should never happen.
 713         */
 714        lock_ptr = ic->rds_ibdev ? &ic->rds_ibdev->spinlock : &ib_nodev_conns_lock;
 715
 716        spin_lock_irq(lock_ptr);
 717        list_del(&ic->ib_node);
 718        spin_unlock_irq(lock_ptr);
 719
 720        kfree(ic);
 721}
 722
 723
 724/*
 725 * An error occurred on the connection
 726 */
 727void
 728__rds_ib_conn_error(struct rds_connection *conn, const char *fmt, ...)
 729{
 730        va_list ap;
 731
 732        rds_conn_drop(conn);
 733
 734        va_start(ap, fmt);
 735        vprintk(fmt, ap);
 736        va_end(ap);
 737}
 738