linux/net/rds/ib_cm.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/in.h>
  35#include <linux/slab.h>
  36#include <linux/vmalloc.h>
  37
  38#include "rds.h"
  39#include "ib.h"
  40
  41/*
  42 * Set the selected protocol version
  43 */
  44static void rds_ib_set_protocol(struct rds_connection *conn, unsigned int version)
  45{
  46        conn->c_version = version;
  47}
  48
  49/*
  50 * Set up flow control
  51 */
  52static void rds_ib_set_flow_control(struct rds_connection *conn, u32 credits)
  53{
  54        struct rds_ib_connection *ic = conn->c_transport_data;
  55
  56        if (rds_ib_sysctl_flow_control && credits != 0) {
  57                /* We're doing flow control */
  58                ic->i_flowctl = 1;
  59                rds_ib_send_add_credits(conn, credits);
  60        } else {
  61                ic->i_flowctl = 0;
  62        }
  63}
  64
  65/*
  66 * Tune RNR behavior. Without flow control, we use a rather
  67 * low timeout, but not the absolute minimum - this should
  68 * be tunable.
  69 *
  70 * We already set the RNR retry count to 7 (which is the
  71 * smallest infinite number :-) above.
  72 * If flow control is off, we want to change this back to 0
  73 * so that we learn quickly when our credit accounting is
  74 * buggy.
  75 *
  76 * Caller passes in a qp_attr pointer - don't waste stack spacv
  77 * by allocation this twice.
  78 */
  79static void
  80rds_ib_tune_rnr(struct rds_ib_connection *ic, struct ib_qp_attr *attr)
  81{
  82        int ret;
  83
  84        attr->min_rnr_timer = IB_RNR_TIMER_000_32;
  85        ret = ib_modify_qp(ic->i_cm_id->qp, attr, IB_QP_MIN_RNR_TIMER);
  86        if (ret)
  87                printk(KERN_NOTICE "ib_modify_qp(IB_QP_MIN_RNR_TIMER): err=%d\n", -ret);
  88}
  89
  90/*
  91 * Connection established.
  92 * We get here for both outgoing and incoming connection.
  93 */
  94void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_event *event)
  95{
  96        const struct rds_ib_connect_private *dp = NULL;
  97        struct rds_ib_connection *ic = conn->c_transport_data;
  98        struct rds_ib_device *rds_ibdev;
  99        struct ib_qp_attr qp_attr;
 100        int err;
 101
 102        if (event->param.conn.private_data_len >= sizeof(*dp)) {
 103                dp = event->param.conn.private_data;
 104
 105                /* make sure it isn't empty data */
 106                if (dp->dp_protocol_major) {
 107                        rds_ib_set_protocol(conn,
 108                                RDS_PROTOCOL(dp->dp_protocol_major,
 109                                dp->dp_protocol_minor));
 110                        rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
 111                }
 112        }
 113
 114        printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n",
 115                        &conn->c_faddr,
 116                        RDS_PROTOCOL_MAJOR(conn->c_version),
 117                        RDS_PROTOCOL_MINOR(conn->c_version),
 118                        ic->i_flowctl ? ", flow control" : "");
 119
 120        /*
 121         * Init rings and fill recv. this needs to wait until protocol negotiation
 122         * is complete, since ring layout is different from 3.0 to 3.1.
 123         */
 124        rds_ib_send_init_ring(ic);
 125        rds_ib_recv_init_ring(ic);
 126        /* Post receive buffers - as a side effect, this will update
 127         * the posted credit count. */
 128        rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
 129
 130        /* Tune RNR behavior */
 131        rds_ib_tune_rnr(ic, &qp_attr);
 132
 133        qp_attr.qp_state = IB_QPS_RTS;
 134        err = ib_modify_qp(ic->i_cm_id->qp, &qp_attr, IB_QP_STATE);
 135        if (err)
 136                printk(KERN_NOTICE "ib_modify_qp(IB_QP_STATE, RTS): err=%d\n", err);
 137
 138        /* update ib_device with this local ipaddr & conn */
 139        rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 140        err = rds_ib_update_ipaddr(rds_ibdev, conn->c_laddr);
 141        if (err)
 142                printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n", err);
 143        rds_ib_add_conn(rds_ibdev, conn);
 144
 145        /* If the peer gave us the last packet it saw, process this as if
 146         * we had received a regular ACK. */
 147        if (dp && dp->dp_ack_seq)
 148                rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
 149
 150        rds_connect_complete(conn);
 151}
 152
 153static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
 154                        struct rdma_conn_param *conn_param,
 155                        struct rds_ib_connect_private *dp,
 156                        u32 protocol_version)
 157{
 158        memset(conn_param, 0, sizeof(struct rdma_conn_param));
 159        /* XXX tune these? */
 160        conn_param->responder_resources = 1;
 161        conn_param->initiator_depth = 1;
 162        conn_param->retry_count = min_t(unsigned int, rds_ib_retry_count, 7);
 163        conn_param->rnr_retry_count = 7;
 164
 165        if (dp) {
 166                struct rds_ib_connection *ic = conn->c_transport_data;
 167
 168                memset(dp, 0, sizeof(*dp));
 169                dp->dp_saddr = conn->c_laddr;
 170                dp->dp_daddr = conn->c_faddr;
 171                dp->dp_protocol_major = RDS_PROTOCOL_MAJOR(protocol_version);
 172                dp->dp_protocol_minor = RDS_PROTOCOL_MINOR(protocol_version);
 173                dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IB_SUPPORTED_PROTOCOLS);
 174                dp->dp_ack_seq = rds_ib_piggyb_ack(ic);
 175
 176                /* Advertise flow control */
 177                if (ic->i_flowctl) {
 178                        unsigned int credits;
 179
 180                        credits = IB_GET_POST_CREDITS(atomic_read(&ic->i_credits));
 181                        dp->dp_credit = cpu_to_be32(credits);
 182                        atomic_sub(IB_SET_POST_CREDITS(credits), &ic->i_credits);
 183                }
 184
 185                conn_param->private_data = dp;
 186                conn_param->private_data_len = sizeof(*dp);
 187        }
 188}
 189
 190static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
 191{
 192        rdsdebug("event %u data %p\n", event->event, data);
 193}
 194
 195static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 196{
 197        struct rds_connection *conn = data;
 198        struct rds_ib_connection *ic = conn->c_transport_data;
 199
 200        rdsdebug("conn %p ic %p event %u\n", conn, ic, event->event);
 201
 202        switch (event->event) {
 203        case IB_EVENT_COMM_EST:
 204                rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
 205                break;
 206        default:
 207                rdsdebug("Fatal QP Event %u "
 208                        "- connection %pI4->%pI4, reconnecting\n",
 209                        event->event, &conn->c_laddr, &conn->c_faddr);
 210                rds_conn_drop(conn);
 211                break;
 212        }
 213}
 214
 215/*
 216 * This needs to be very careful to not leave IS_ERR pointers around for
 217 * cleanup to trip over.
 218 */
 219static int rds_ib_setup_qp(struct rds_connection *conn)
 220{
 221        struct rds_ib_connection *ic = conn->c_transport_data;
 222        struct ib_device *dev = ic->i_cm_id->device;
 223        struct ib_qp_init_attr attr;
 224        struct rds_ib_device *rds_ibdev;
 225        int ret;
 226
 227        /* rds_ib_add_one creates a rds_ib_device object per IB device,
 228         * and allocates a protection domain, memory range and FMR pool
 229         * for each.  If that fails for any reason, it will not register
 230         * the rds_ibdev at all.
 231         */
 232        rds_ibdev = ib_get_client_data(dev, &rds_ib_client);
 233        if (rds_ibdev == NULL) {
 234                if (printk_ratelimit())
 235                        printk(KERN_NOTICE "RDS/IB: No client_data for device %s\n",
 236                                        dev->name);
 237                return -EOPNOTSUPP;
 238        }
 239
 240        if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
 241                rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
 242        if (rds_ibdev->max_wrs < ic->i_recv_ring.w_nr + 1)
 243                rds_ib_ring_resize(&ic->i_recv_ring, rds_ibdev->max_wrs - 1);
 244
 245        /* Protection domain and memory range */
 246        ic->i_pd = rds_ibdev->pd;
 247        ic->i_mr = rds_ibdev->mr;
 248
 249        ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
 250                                     rds_ib_cq_event_handler, conn,
 251                                     ic->i_send_ring.w_nr + 1, 0);
 252        if (IS_ERR(ic->i_send_cq)) {
 253                ret = PTR_ERR(ic->i_send_cq);
 254                ic->i_send_cq = NULL;
 255                rdsdebug("ib_create_cq send failed: %d\n", ret);
 256                goto out;
 257        }
 258
 259        ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler,
 260                                     rds_ib_cq_event_handler, conn,
 261                                     ic->i_recv_ring.w_nr, 0);
 262        if (IS_ERR(ic->i_recv_cq)) {
 263                ret = PTR_ERR(ic->i_recv_cq);
 264                ic->i_recv_cq = NULL;
 265                rdsdebug("ib_create_cq recv failed: %d\n", ret);
 266                goto out;
 267        }
 268
 269        ret = ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
 270        if (ret) {
 271                rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
 272                goto out;
 273        }
 274
 275        ret = ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
 276        if (ret) {
 277                rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
 278                goto out;
 279        }
 280
 281        /* XXX negotiate max send/recv with remote? */
 282        memset(&attr, 0, sizeof(attr));
 283        attr.event_handler = rds_ib_qp_event_handler;
 284        attr.qp_context = conn;
 285        /* + 1 to allow for the single ack message */
 286        attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1;
 287        attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
 288        attr.cap.max_send_sge = rds_ibdev->max_sge;
 289        attr.cap.max_recv_sge = RDS_IB_RECV_SGE;
 290        attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 291        attr.qp_type = IB_QPT_RC;
 292        attr.send_cq = ic->i_send_cq;
 293        attr.recv_cq = ic->i_recv_cq;
 294
 295        /*
 296         * XXX this can fail if max_*_wr is too large?  Are we supposed
 297         * to back off until we get a value that the hardware can support?
 298         */
 299        ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
 300        if (ret) {
 301                rdsdebug("rdma_create_qp failed: %d\n", ret);
 302                goto out;
 303        }
 304
 305        ic->i_send_hdrs = ib_dma_alloc_coherent(dev,
 306                                           ic->i_send_ring.w_nr *
 307                                                sizeof(struct rds_header),
 308                                           &ic->i_send_hdrs_dma, GFP_KERNEL);
 309        if (ic->i_send_hdrs == NULL) {
 310                ret = -ENOMEM;
 311                rdsdebug("ib_dma_alloc_coherent send failed\n");
 312                goto out;
 313        }
 314
 315        ic->i_recv_hdrs = ib_dma_alloc_coherent(dev,
 316                                           ic->i_recv_ring.w_nr *
 317                                                sizeof(struct rds_header),
 318                                           &ic->i_recv_hdrs_dma, GFP_KERNEL);
 319        if (ic->i_recv_hdrs == NULL) {
 320                ret = -ENOMEM;
 321                rdsdebug("ib_dma_alloc_coherent recv failed\n");
 322                goto out;
 323        }
 324
 325        ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header),
 326                                       &ic->i_ack_dma, GFP_KERNEL);
 327        if (ic->i_ack == NULL) {
 328                ret = -ENOMEM;
 329                rdsdebug("ib_dma_alloc_coherent ack failed\n");
 330                goto out;
 331        }
 332
 333        ic->i_sends = vmalloc(ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work));
 334        if (ic->i_sends == NULL) {
 335                ret = -ENOMEM;
 336                rdsdebug("send allocation failed\n");
 337                goto out;
 338        }
 339        memset(ic->i_sends, 0, ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work));
 340
 341        ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work));
 342        if (ic->i_recvs == NULL) {
 343                ret = -ENOMEM;
 344                rdsdebug("recv allocation failed\n");
 345                goto out;
 346        }
 347        memset(ic->i_recvs, 0, ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work));
 348
 349        rds_ib_recv_init_ack(ic);
 350
 351        rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr,
 352                 ic->i_send_cq, ic->i_recv_cq);
 353
 354out:
 355        return ret;
 356}
 357
 358static u32 rds_ib_protocol_compatible(struct rdma_cm_event *event)
 359{
 360        const struct rds_ib_connect_private *dp = event->param.conn.private_data;
 361        u16 common;
 362        u32 version = 0;
 363
 364        /*
 365         * rdma_cm private data is odd - when there is any private data in the
 366         * request, we will be given a pretty large buffer without telling us the
 367         * original size. The only way to tell the difference is by looking at
 368         * the contents, which are initialized to zero.
 369         * If the protocol version fields aren't set, this is a connection attempt
 370         * from an older version. This could could be 3.0 or 2.0 - we can't tell.
 371         * We really should have changed this for OFED 1.3 :-(
 372         */
 373
 374        /* Be paranoid. RDS always has privdata */
 375        if (!event->param.conn.private_data_len) {
 376                printk(KERN_NOTICE "RDS incoming connection has no private data, "
 377                        "rejecting\n");
 378                return 0;
 379        }
 380
 381        /* Even if len is crap *now* I still want to check it. -ASG */
 382        if (event->param.conn.private_data_len < sizeof (*dp) ||
 383            dp->dp_protocol_major == 0)
 384                return RDS_PROTOCOL_3_0;
 385
 386        common = be16_to_cpu(dp->dp_protocol_minor_mask) & RDS_IB_SUPPORTED_PROTOCOLS;
 387        if (dp->dp_protocol_major == 3 && common) {
 388                version = RDS_PROTOCOL_3_0;
 389                while ((common >>= 1) != 0)
 390                        version++;
 391        } else if (printk_ratelimit()) {
 392                printk(KERN_NOTICE "RDS: Connection from %pI4 using "
 393                        "incompatible protocol version %u.%u\n",
 394                        &dp->dp_saddr,
 395                        dp->dp_protocol_major,
 396                        dp->dp_protocol_minor);
 397        }
 398        return version;
 399}
 400
 401int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
 402                                    struct rdma_cm_event *event)
 403{
 404        __be64 lguid = cm_id->route.path_rec->sgid.global.interface_id;
 405        __be64 fguid = cm_id->route.path_rec->dgid.global.interface_id;
 406        const struct rds_ib_connect_private *dp = event->param.conn.private_data;
 407        struct rds_ib_connect_private dp_rep;
 408        struct rds_connection *conn = NULL;
 409        struct rds_ib_connection *ic = NULL;
 410        struct rdma_conn_param conn_param;
 411        u32 version;
 412        int err, destroy = 1;
 413
 414        /* Check whether the remote protocol version matches ours. */
 415        version = rds_ib_protocol_compatible(event);
 416        if (!version)
 417                goto out;
 418
 419        rdsdebug("saddr %pI4 daddr %pI4 RDSv%u.%u lguid 0x%llx fguid "
 420                 "0x%llx\n", &dp->dp_saddr, &dp->dp_daddr,
 421                 RDS_PROTOCOL_MAJOR(version), RDS_PROTOCOL_MINOR(version),
 422                 (unsigned long long)be64_to_cpu(lguid),
 423                 (unsigned long long)be64_to_cpu(fguid));
 424
 425        conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_ib_transport,
 426                               GFP_KERNEL);
 427        if (IS_ERR(conn)) {
 428                rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn));
 429                conn = NULL;
 430                goto out;
 431        }
 432
 433        /*
 434         * The connection request may occur while the
 435         * previous connection exist, e.g. in case of failover.
 436         * But as connections may be initiated simultaneously
 437         * by both hosts, we have a random backoff mechanism -
 438         * see the comment above rds_queue_reconnect()
 439         */
 440        mutex_lock(&conn->c_cm_lock);
 441        if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
 442                if (rds_conn_state(conn) == RDS_CONN_UP) {
 443                        rdsdebug("incoming connect while connecting\n");
 444                        rds_conn_drop(conn);
 445                        rds_ib_stats_inc(s_ib_listen_closed_stale);
 446                } else
 447                if (rds_conn_state(conn) == RDS_CONN_CONNECTING) {
 448                        /* Wait and see - our connect may still be succeeding */
 449                        rds_ib_stats_inc(s_ib_connect_raced);
 450                }
 451                mutex_unlock(&conn->c_cm_lock);
 452                goto out;
 453        }
 454
 455        ic = conn->c_transport_data;
 456
 457        rds_ib_set_protocol(conn, version);
 458        rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
 459
 460        /* If the peer gave us the last packet it saw, process this as if
 461         * we had received a regular ACK. */
 462        if (dp->dp_ack_seq)
 463                rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
 464
 465        BUG_ON(cm_id->context);
 466        BUG_ON(ic->i_cm_id);
 467
 468        ic->i_cm_id = cm_id;
 469        cm_id->context = conn;
 470
 471        /* We got halfway through setting up the ib_connection, if we
 472         * fail now, we have to take the long route out of this mess. */
 473        destroy = 0;
 474
 475        err = rds_ib_setup_qp(conn);
 476        if (err) {
 477                rds_ib_conn_error(conn, "rds_ib_setup_qp failed (%d)\n", err);
 478                mutex_unlock(&conn->c_cm_lock);
 479                goto out;
 480        }
 481
 482        rds_ib_cm_fill_conn_param(conn, &conn_param, &dp_rep, version);
 483
 484        /* rdma_accept() calls rdma_reject() internally if it fails */
 485        err = rdma_accept(cm_id, &conn_param);
 486        mutex_unlock(&conn->c_cm_lock);
 487        if (err) {
 488                rds_ib_conn_error(conn, "rdma_accept failed (%d)\n", err);
 489                goto out;
 490        }
 491
 492        return 0;
 493
 494out:
 495        rdma_reject(cm_id, NULL, 0);
 496        return destroy;
 497}
 498
 499
 500int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
 501{
 502        struct rds_connection *conn = cm_id->context;
 503        struct rds_ib_connection *ic = conn->c_transport_data;
 504        struct rdma_conn_param conn_param;
 505        struct rds_ib_connect_private dp;
 506        int ret;
 507
 508        /* If the peer doesn't do protocol negotiation, we must
 509         * default to RDSv3.0 */
 510        rds_ib_set_protocol(conn, RDS_PROTOCOL_3_0);
 511        ic->i_flowctl = rds_ib_sysctl_flow_control;     /* advertise flow control */
 512
 513        ret = rds_ib_setup_qp(conn);
 514        if (ret) {
 515                rds_ib_conn_error(conn, "rds_ib_setup_qp failed (%d)\n", ret);
 516                goto out;
 517        }
 518
 519        rds_ib_cm_fill_conn_param(conn, &conn_param, &dp, RDS_PROTOCOL_VERSION);
 520
 521        ret = rdma_connect(cm_id, &conn_param);
 522        if (ret)
 523                rds_ib_conn_error(conn, "rdma_connect failed (%d)\n", ret);
 524
 525out:
 526        /* Beware - returning non-zero tells the rdma_cm to destroy
 527         * the cm_id. We should certainly not do it as long as we still
 528         * "own" the cm_id. */
 529        if (ret) {
 530                if (ic->i_cm_id == cm_id)
 531                        ret = 0;
 532        }
 533        return ret;
 534}
 535
 536int rds_ib_conn_connect(struct rds_connection *conn)
 537{
 538        struct rds_ib_connection *ic = conn->c_transport_data;
 539        struct sockaddr_in src, dest;
 540        int ret;
 541
 542        /* XXX I wonder what affect the port space has */
 543        /* delegate cm event handler to rdma_transport */
 544        ic->i_cm_id = rdma_create_id(rds_rdma_cm_event_handler, conn,
 545                                     RDMA_PS_TCP);
 546        if (IS_ERR(ic->i_cm_id)) {
 547                ret = PTR_ERR(ic->i_cm_id);
 548                ic->i_cm_id = NULL;
 549                rdsdebug("rdma_create_id() failed: %d\n", ret);
 550                goto out;
 551        }
 552
 553        rdsdebug("created cm id %p for conn %p\n", ic->i_cm_id, conn);
 554
 555        src.sin_family = AF_INET;
 556        src.sin_addr.s_addr = (__force u32)conn->c_laddr;
 557        src.sin_port = (__force u16)htons(0);
 558
 559        dest.sin_family = AF_INET;
 560        dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
 561        dest.sin_port = (__force u16)htons(RDS_PORT);
 562
 563        ret = rdma_resolve_addr(ic->i_cm_id, (struct sockaddr *)&src,
 564                                (struct sockaddr *)&dest,
 565                                RDS_RDMA_RESOLVE_TIMEOUT_MS);
 566        if (ret) {
 567                rdsdebug("addr resolve failed for cm id %p: %d\n", ic->i_cm_id,
 568                         ret);
 569                rdma_destroy_id(ic->i_cm_id);
 570                ic->i_cm_id = NULL;
 571        }
 572
 573out:
 574        return ret;
 575}
 576
 577/*
 578 * This is so careful about only cleaning up resources that were built up
 579 * so that it can be called at any point during startup.  In fact it
 580 * can be called multiple times for a given connection.
 581 */
 582void rds_ib_conn_shutdown(struct rds_connection *conn)
 583{
 584        struct rds_ib_connection *ic = conn->c_transport_data;
 585        int err = 0;
 586
 587        rdsdebug("cm %p pd %p cq %p %p qp %p\n", ic->i_cm_id,
 588                 ic->i_pd, ic->i_send_cq, ic->i_recv_cq,
 589                 ic->i_cm_id ? ic->i_cm_id->qp : NULL);
 590
 591        if (ic->i_cm_id) {
 592                struct ib_device *dev = ic->i_cm_id->device;
 593
 594                rdsdebug("disconnecting cm %p\n", ic->i_cm_id);
 595                err = rdma_disconnect(ic->i_cm_id);
 596                if (err) {
 597                        /* Actually this may happen quite frequently, when
 598                         * an outgoing connect raced with an incoming connect.
 599                         */
 600                        rdsdebug("failed to disconnect, cm: %p err %d\n",
 601                                ic->i_cm_id, err);
 602                }
 603
 604                wait_event(rds_ib_ring_empty_wait,
 605                        rds_ib_ring_empty(&ic->i_send_ring) &&
 606                        rds_ib_ring_empty(&ic->i_recv_ring));
 607
 608                if (ic->i_send_hdrs)
 609                        ib_dma_free_coherent(dev,
 610                                           ic->i_send_ring.w_nr *
 611                                                sizeof(struct rds_header),
 612                                           ic->i_send_hdrs,
 613                                           ic->i_send_hdrs_dma);
 614
 615                if (ic->i_recv_hdrs)
 616                        ib_dma_free_coherent(dev,
 617                                           ic->i_recv_ring.w_nr *
 618                                                sizeof(struct rds_header),
 619                                           ic->i_recv_hdrs,
 620                                           ic->i_recv_hdrs_dma);
 621
 622                if (ic->i_ack)
 623                        ib_dma_free_coherent(dev, sizeof(struct rds_header),
 624                                             ic->i_ack, ic->i_ack_dma);
 625
 626                if (ic->i_sends)
 627                        rds_ib_send_clear_ring(ic);
 628                if (ic->i_recvs)
 629                        rds_ib_recv_clear_ring(ic);
 630
 631                if (ic->i_cm_id->qp)
 632                        rdma_destroy_qp(ic->i_cm_id);
 633                if (ic->i_send_cq)
 634                        ib_destroy_cq(ic->i_send_cq);
 635                if (ic->i_recv_cq)
 636                        ib_destroy_cq(ic->i_recv_cq);
 637                rdma_destroy_id(ic->i_cm_id);
 638
 639                /*
 640                 * Move connection back to the nodev list.
 641                 */
 642                if (ic->rds_ibdev)
 643                        rds_ib_remove_conn(ic->rds_ibdev, conn);
 644
 645                ic->i_cm_id = NULL;
 646                ic->i_pd = NULL;
 647                ic->i_mr = NULL;
 648                ic->i_send_cq = NULL;
 649                ic->i_recv_cq = NULL;
 650                ic->i_send_hdrs = NULL;
 651                ic->i_recv_hdrs = NULL;
 652                ic->i_ack = NULL;
 653        }
 654        BUG_ON(ic->rds_ibdev);
 655
 656        /* Clear pending transmit */
 657        if (ic->i_rm) {
 658                rds_message_put(ic->i_rm);
 659                ic->i_rm = NULL;
 660        }
 661
 662        /* Clear the ACK state */
 663        clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
 664#ifdef KERNEL_HAS_ATOMIC64
 665        atomic64_set(&ic->i_ack_next, 0);
 666#else
 667        ic->i_ack_next = 0;
 668#endif
 669        ic->i_ack_recv = 0;
 670
 671        /* Clear flow control state */
 672        ic->i_flowctl = 0;
 673        atomic_set(&ic->i_credits, 0);
 674
 675        rds_ib_ring_init(&ic->i_send_ring, rds_ib_sysctl_max_send_wr);
 676        rds_ib_ring_init(&ic->i_recv_ring, rds_ib_sysctl_max_recv_wr);
 677
 678        if (ic->i_ibinc) {
 679                rds_inc_put(&ic->i_ibinc->ii_inc);
 680                ic->i_ibinc = NULL;
 681        }
 682
 683        vfree(ic->i_sends);
 684        ic->i_sends = NULL;
 685        vfree(ic->i_recvs);
 686        ic->i_recvs = NULL;
 687}
 688
 689int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 690{
 691        struct rds_ib_connection *ic;
 692        unsigned long flags;
 693
 694        /* XXX too lazy? */
 695        ic = kzalloc(sizeof(struct rds_ib_connection), GFP_KERNEL);
 696        if (ic == NULL)
 697                return -ENOMEM;
 698
 699        INIT_LIST_HEAD(&ic->ib_node);
 700        tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn,
 701                     (unsigned long) ic);
 702        mutex_init(&ic->i_recv_mutex);
 703#ifndef KERNEL_HAS_ATOMIC64
 704        spin_lock_init(&ic->i_ack_lock);
 705#endif
 706
 707        /*
 708         * rds_ib_conn_shutdown() waits for these to be emptied so they
 709         * must be initialized before it can be called.
 710         */
 711        rds_ib_ring_init(&ic->i_send_ring, rds_ib_sysctl_max_send_wr);
 712        rds_ib_ring_init(&ic->i_recv_ring, rds_ib_sysctl_max_recv_wr);
 713
 714        ic->conn = conn;
 715        conn->c_transport_data = ic;
 716
 717        spin_lock_irqsave(&ib_nodev_conns_lock, flags);
 718        list_add_tail(&ic->ib_node, &ib_nodev_conns);
 719        spin_unlock_irqrestore(&ib_nodev_conns_lock, flags);
 720
 721
 722        rdsdebug("conn %p conn ic %p\n", conn, conn->c_transport_data);
 723        return 0;
 724}
 725
 726/*
 727 * Free a connection. Connection must be shut down and not set for reconnect.
 728 */
 729void rds_ib_conn_free(void *arg)
 730{
 731        struct rds_ib_connection *ic = arg;
 732        spinlock_t      *lock_ptr;
 733
 734        rdsdebug("ic %p\n", ic);
 735
 736        /*
 737         * Conn is either on a dev's list or on the nodev list.
 738         * A race with shutdown() or connect() would cause problems
 739         * (since rds_ibdev would change) but that should never happen.
 740         */
 741        lock_ptr = ic->rds_ibdev ? &ic->rds_ibdev->spinlock : &ib_nodev_conns_lock;
 742
 743        spin_lock_irq(lock_ptr);
 744        list_del(&ic->ib_node);
 745        spin_unlock_irq(lock_ptr);
 746
 747        kfree(ic);
 748}
 749
 750
 751/*
 752 * An error occurred on the connection
 753 */
 754void
 755__rds_ib_conn_error(struct rds_connection *conn, const char *fmt, ...)
 756{
 757        va_list ap;
 758
 759        rds_conn_drop(conn);
 760
 761        va_start(ap, fmt);
 762        vprintk(fmt, ap);
 763        va_end(ap);
 764}
 765