linux/net/smc/smc_cdc.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Shared Memory Communications over RDMA (SMC-R) and RoCE
   4 *
   5 * Connection Data Control (CDC)
   6 * handles flow control
   7 *
   8 * Copyright IBM Corp. 2016
   9 *
  10 * Author(s):  Ursula Braun <ubraun@linux.vnet.ibm.com>
  11 */
  12
  13#include <linux/spinlock.h>
  14
  15#include "smc.h"
  16#include "smc_wr.h"
  17#include "smc_cdc.h"
  18#include "smc_tx.h"
  19#include "smc_rx.h"
  20#include "smc_close.h"
  21
  22/********************************** send *************************************/
  23
  24/* handler for send/transmission completion of a CDC msg */
  25static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
  26                               struct smc_link *link,
  27                               enum ib_wc_status wc_status)
  28{
  29        struct smc_cdc_tx_pend *cdcpend = (struct smc_cdc_tx_pend *)pnd_snd;
  30        struct smc_connection *conn = cdcpend->conn;
  31        struct smc_sock *smc;
  32        int diff;
  33
  34        if (!conn)
  35                /* already dismissed */
  36                return;
  37
  38        smc = container_of(conn, struct smc_sock, conn);
  39        bh_lock_sock(&smc->sk);
  40        if (!wc_status) {
  41                diff = smc_curs_diff(cdcpend->conn->sndbuf_desc->len,
  42                                     &cdcpend->conn->tx_curs_fin,
  43                                     &cdcpend->cursor);
  44                /* sndbuf_space is decreased in smc_sendmsg */
  45                smp_mb__before_atomic();
  46                atomic_add(diff, &cdcpend->conn->sndbuf_space);
  47                /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
  48                smp_mb__after_atomic();
  49                smc_curs_copy(&conn->tx_curs_fin, &cdcpend->cursor, conn);
  50                smc_curs_copy(&conn->local_tx_ctrl_fin, &cdcpend->p_cursor,
  51                              conn);
  52                conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
  53        }
  54        smc_tx_sndbuf_nonfull(smc);
  55        bh_unlock_sock(&smc->sk);
  56}
  57
  58int smc_cdc_get_free_slot(struct smc_connection *conn,
  59                          struct smc_link *link,
  60                          struct smc_wr_buf **wr_buf,
  61                          struct smc_rdma_wr **wr_rdma_buf,
  62                          struct smc_cdc_tx_pend **pend)
  63{
  64        int rc;
  65
  66        rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf,
  67                                     wr_rdma_buf,
  68                                     (struct smc_wr_tx_pend_priv **)pend);
  69        if (conn->killed) {
  70                /* abnormal termination */
  71                if (!rc)
  72                        smc_wr_tx_put_slot(link,
  73                                           (struct smc_wr_tx_pend_priv *)pend);
  74                rc = -EPIPE;
  75        }
  76        return rc;
  77}
  78
  79static inline void smc_cdc_add_pending_send(struct smc_connection *conn,
  80                                            struct smc_cdc_tx_pend *pend)
  81{
  82        BUILD_BUG_ON_MSG(
  83                sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE,
  84                "must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)");
  85        BUILD_BUG_ON_MSG(
  86                offsetofend(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE,
  87                "must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()");
  88        BUILD_BUG_ON_MSG(
  89                sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE,
  90                "must increase SMC_WR_TX_PEND_PRIV_SIZE to at least sizeof(struct smc_cdc_tx_pend)");
  91        pend->conn = conn;
  92        pend->cursor = conn->tx_curs_sent;
  93        pend->p_cursor = conn->local_tx_ctrl.prod;
  94        pend->ctrl_seq = conn->tx_cdc_seq;
  95}
  96
  97int smc_cdc_msg_send(struct smc_connection *conn,
  98                     struct smc_wr_buf *wr_buf,
  99                     struct smc_cdc_tx_pend *pend)
 100{
 101        struct smc_link *link = conn->lnk;
 102        union smc_host_cursor cfed;
 103        int rc;
 104
 105        smc_cdc_add_pending_send(conn, pend);
 106
 107        conn->tx_cdc_seq++;
 108        conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
 109        smc_host_msg_to_cdc((struct smc_cdc_msg *)wr_buf, conn, &cfed);
 110        rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
 111        if (!rc) {
 112                smc_curs_copy(&conn->rx_curs_confirmed, &cfed, conn);
 113                conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
 114        } else {
 115                conn->tx_cdc_seq--;
 116                conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
 117        }
 118
 119        return rc;
 120}
 121
 122/* send a validation msg indicating the move of a conn to an other QP link */
 123int smcr_cdc_msg_send_validation(struct smc_connection *conn,
 124                                 struct smc_cdc_tx_pend *pend,
 125                                 struct smc_wr_buf *wr_buf)
 126{
 127        struct smc_host_cdc_msg *local = &conn->local_tx_ctrl;
 128        struct smc_link *link = conn->lnk;
 129        struct smc_cdc_msg *peer;
 130        int rc;
 131
 132        peer = (struct smc_cdc_msg *)wr_buf;
 133        peer->common.type = local->common.type;
 134        peer->len = local->len;
 135        peer->seqno = htons(conn->tx_cdc_seq_fin); /* seqno last compl. tx */
 136        peer->token = htonl(local->token);
 137        peer->prod_flags.failover_validation = 1;
 138
 139        rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
 140        return rc;
 141}
 142
 143static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn)
 144{
 145        struct smc_cdc_tx_pend *pend;
 146        struct smc_wr_buf *wr_buf;
 147        struct smc_link *link;
 148        bool again = false;
 149        int rc;
 150
 151again:
 152        link = conn->lnk;
 153        rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
 154        if (rc)
 155                return rc;
 156
 157        spin_lock_bh(&conn->send_lock);
 158        if (link != conn->lnk) {
 159                /* link of connection changed, try again one time*/
 160                spin_unlock_bh(&conn->send_lock);
 161                smc_wr_tx_put_slot(link,
 162                                   (struct smc_wr_tx_pend_priv *)pend);
 163                if (again)
 164                        return -ENOLINK;
 165                again = true;
 166                goto again;
 167        }
 168        rc = smc_cdc_msg_send(conn, wr_buf, pend);
 169        spin_unlock_bh(&conn->send_lock);
 170        return rc;
 171}
 172
 173int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
 174{
 175        int rc;
 176
 177        if (!conn->lgr || (conn->lgr->is_smcd && conn->lgr->peer_shutdown))
 178                return -EPIPE;
 179
 180        if (conn->lgr->is_smcd) {
 181                spin_lock_bh(&conn->send_lock);
 182                rc = smcd_cdc_msg_send(conn);
 183                spin_unlock_bh(&conn->send_lock);
 184        } else {
 185                rc = smcr_cdc_get_slot_and_msg_send(conn);
 186        }
 187
 188        return rc;
 189}
 190
 191static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend,
 192                              unsigned long data)
 193{
 194        struct smc_connection *conn = (struct smc_connection *)data;
 195        struct smc_cdc_tx_pend *cdc_pend =
 196                (struct smc_cdc_tx_pend *)tx_pend;
 197
 198        return cdc_pend->conn == conn;
 199}
 200
 201static void smc_cdc_tx_dismisser(struct smc_wr_tx_pend_priv *tx_pend)
 202{
 203        struct smc_cdc_tx_pend *cdc_pend =
 204                (struct smc_cdc_tx_pend *)tx_pend;
 205
 206        cdc_pend->conn = NULL;
 207}
 208
 209void smc_cdc_tx_dismiss_slots(struct smc_connection *conn)
 210{
 211        struct smc_link *link = conn->lnk;
 212
 213        smc_wr_tx_dismiss_slots(link, SMC_CDC_MSG_TYPE,
 214                                smc_cdc_tx_filter, smc_cdc_tx_dismisser,
 215                                (unsigned long)conn);
 216}
 217
 218/* Send a SMC-D CDC header.
 219 * This increments the free space available in our send buffer.
 220 * Also update the confirmed receive buffer with what was sent to the peer.
 221 */
 222int smcd_cdc_msg_send(struct smc_connection *conn)
 223{
 224        struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
 225        union smc_host_cursor curs;
 226        struct smcd_cdc_msg cdc;
 227        int rc, diff;
 228
 229        memset(&cdc, 0, sizeof(cdc));
 230        cdc.common.type = SMC_CDC_MSG_TYPE;
 231        curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.prod.acurs);
 232        cdc.prod.wrap = curs.wrap;
 233        cdc.prod.count = curs.count;
 234        curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.cons.acurs);
 235        cdc.cons.wrap = curs.wrap;
 236        cdc.cons.count = curs.count;
 237        cdc.cons.prod_flags = conn->local_tx_ctrl.prod_flags;
 238        cdc.cons.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
 239        rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1);
 240        if (rc)
 241                return rc;
 242        smc_curs_copy(&conn->rx_curs_confirmed, &curs, conn);
 243        conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
 244        /* Calculate transmitted data and increment free send buffer space */
 245        diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
 246                             &conn->tx_curs_sent);
 247        /* increased by confirmed number of bytes */
 248        smp_mb__before_atomic();
 249        atomic_add(diff, &conn->sndbuf_space);
 250        /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
 251        smp_mb__after_atomic();
 252        smc_curs_copy(&conn->tx_curs_fin, &conn->tx_curs_sent, conn);
 253
 254        smc_tx_sndbuf_nonfull(smc);
 255        return rc;
 256}
 257
 258/********************************* receive ***********************************/
 259
 260static inline bool smc_cdc_before(u16 seq1, u16 seq2)
 261{
 262        return (s16)(seq1 - seq2) < 0;
 263}
 264
 265static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc,
 266                                            int *diff_prod)
 267{
 268        struct smc_connection *conn = &smc->conn;
 269        char *base;
 270
 271        /* new data included urgent business */
 272        smc_curs_copy(&conn->urg_curs, &conn->local_rx_ctrl.prod, conn);
 273        conn->urg_state = SMC_URG_VALID;
 274        if (!sock_flag(&smc->sk, SOCK_URGINLINE))
 275                /* we'll skip the urgent byte, so don't account for it */
 276                (*diff_prod)--;
 277        base = (char *)conn->rmb_desc->cpu_addr + conn->rx_off;
 278        if (conn->urg_curs.count)
 279                conn->urg_rx_byte = *(base + conn->urg_curs.count - 1);
 280        else
 281                conn->urg_rx_byte = *(base + conn->rmb_desc->len - 1);
 282        sk_send_sigurg(&smc->sk);
 283}
 284
 285static void smc_cdc_msg_validate(struct smc_sock *smc, struct smc_cdc_msg *cdc,
 286                                 struct smc_link *link)
 287{
 288        struct smc_connection *conn = &smc->conn;
 289        u16 recv_seq = ntohs(cdc->seqno);
 290        s16 diff;
 291
 292        /* check that seqnum was seen before */
 293        diff = conn->local_rx_ctrl.seqno - recv_seq;
 294        if (diff < 0) { /* diff larger than 0x7fff */
 295                /* drop connection */
 296                conn->out_of_sync = 1;  /* prevent any further receives */
 297                spin_lock_bh(&conn->send_lock);
 298                conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
 299                conn->lnk = link;
 300                spin_unlock_bh(&conn->send_lock);
 301                sock_hold(&smc->sk); /* sock_put in abort_work */
 302                if (!queue_work(smc_close_wq, &conn->abort_work))
 303                        sock_put(&smc->sk);
 304        }
 305}
 306
 307static void smc_cdc_msg_recv_action(struct smc_sock *smc,
 308                                    struct smc_cdc_msg *cdc)
 309{
 310        union smc_host_cursor cons_old, prod_old;
 311        struct smc_connection *conn = &smc->conn;
 312        int diff_cons, diff_prod;
 313
 314        smc_curs_copy(&prod_old, &conn->local_rx_ctrl.prod, conn);
 315        smc_curs_copy(&cons_old, &conn->local_rx_ctrl.cons, conn);
 316        smc_cdc_msg_to_host(&conn->local_rx_ctrl, cdc, conn);
 317
 318        diff_cons = smc_curs_diff(conn->peer_rmbe_size, &cons_old,
 319                                  &conn->local_rx_ctrl.cons);
 320        if (diff_cons) {
 321                /* peer_rmbe_space is decreased during data transfer with RDMA
 322                 * write
 323                 */
 324                smp_mb__before_atomic();
 325                atomic_add(diff_cons, &conn->peer_rmbe_space);
 326                /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
 327                smp_mb__after_atomic();
 328        }
 329
 330        diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old,
 331                                  &conn->local_rx_ctrl.prod);
 332        if (diff_prod) {
 333                if (conn->local_rx_ctrl.prod_flags.urg_data_present)
 334                        smc_cdc_handle_urg_data_arrival(smc, &diff_prod);
 335                /* bytes_to_rcv is decreased in smc_recvmsg */
 336                smp_mb__before_atomic();
 337                atomic_add(diff_prod, &conn->bytes_to_rcv);
 338                /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
 339                smp_mb__after_atomic();
 340                smc->sk.sk_data_ready(&smc->sk);
 341        } else {
 342                if (conn->local_rx_ctrl.prod_flags.write_blocked)
 343                        smc->sk.sk_data_ready(&smc->sk);
 344                if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
 345                        conn->urg_state = SMC_URG_NOTYET;
 346        }
 347
 348        /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
 349        if ((diff_cons && smc_tx_prepared_sends(conn)) ||
 350            conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
 351            conn->local_rx_ctrl.prod_flags.urg_data_pending)
 352                smc_tx_sndbuf_nonempty(conn);
 353
 354        if (diff_cons && conn->urg_tx_pend &&
 355            atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
 356                /* urg data confirmed by peer, indicate we're ready for more */
 357                conn->urg_tx_pend = false;
 358                smc->sk.sk_write_space(&smc->sk);
 359        }
 360
 361        if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
 362                smc->sk.sk_err = ECONNRESET;
 363                conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
 364        }
 365        if (smc_cdc_rxed_any_close_or_senddone(conn)) {
 366                smc->sk.sk_shutdown |= RCV_SHUTDOWN;
 367                if (smc->clcsock && smc->clcsock->sk)
 368                        smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN;
 369                sock_set_flag(&smc->sk, SOCK_DONE);
 370                sock_hold(&smc->sk); /* sock_put in close_work */
 371                if (!queue_work(smc_close_wq, &conn->close_work))
 372                        sock_put(&smc->sk);
 373        }
 374}
 375
 376/* called under tasklet context */
 377static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
 378{
 379        sock_hold(&smc->sk);
 380        bh_lock_sock(&smc->sk);
 381        smc_cdc_msg_recv_action(smc, cdc);
 382        bh_unlock_sock(&smc->sk);
 383        sock_put(&smc->sk); /* no free sk in softirq-context */
 384}
 385
 386/* Schedule a tasklet for this connection. Triggered from the ISM device IRQ
 387 * handler to indicate update in the DMBE.
 388 *
 389 * Context:
 390 * - tasklet context
 391 */
 392static void smcd_cdc_rx_tsklet(struct tasklet_struct *t)
 393{
 394        struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet);
 395        struct smcd_cdc_msg *data_cdc;
 396        struct smcd_cdc_msg cdc;
 397        struct smc_sock *smc;
 398
 399        if (!conn || conn->killed)
 400                return;
 401
 402        data_cdc = (struct smcd_cdc_msg *)conn->rmb_desc->cpu_addr;
 403        smcd_curs_copy(&cdc.prod, &data_cdc->prod, conn);
 404        smcd_curs_copy(&cdc.cons, &data_cdc->cons, conn);
 405        smc = container_of(conn, struct smc_sock, conn);
 406        smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc);
 407}
 408
 409/* Initialize receive tasklet. Called from ISM device IRQ handler to start
 410 * receiver side.
 411 */
 412void smcd_cdc_rx_init(struct smc_connection *conn)
 413{
 414        tasklet_setup(&conn->rx_tsklet, smcd_cdc_rx_tsklet);
 415}
 416
 417/***************************** init, exit, misc ******************************/
 418
 419static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf)
 420{
 421        struct smc_link *link = (struct smc_link *)wc->qp->qp_context;
 422        struct smc_cdc_msg *cdc = buf;
 423        struct smc_connection *conn;
 424        struct smc_link_group *lgr;
 425        struct smc_sock *smc;
 426
 427        if (wc->byte_len < offsetof(struct smc_cdc_msg, reserved))
 428                return; /* short message */
 429        if (cdc->len != SMC_WR_TX_SIZE)
 430                return; /* invalid message */
 431
 432        /* lookup connection */
 433        lgr = smc_get_lgr(link);
 434        read_lock_bh(&lgr->conns_lock);
 435        conn = smc_lgr_find_conn(ntohl(cdc->token), lgr);
 436        read_unlock_bh(&lgr->conns_lock);
 437        if (!conn || conn->out_of_sync)
 438                return;
 439        smc = container_of(conn, struct smc_sock, conn);
 440
 441        if (cdc->prod_flags.failover_validation) {
 442                smc_cdc_msg_validate(smc, cdc, link);
 443                return;
 444        }
 445        if (smc_cdc_before(ntohs(cdc->seqno),
 446                           conn->local_rx_ctrl.seqno))
 447                /* received seqno is old */
 448                return;
 449
 450        smc_cdc_msg_recv(smc, cdc);
 451}
 452
 453static struct smc_wr_rx_handler smc_cdc_rx_handlers[] = {
 454        {
 455                .handler        = smc_cdc_rx_handler,
 456                .type           = SMC_CDC_MSG_TYPE
 457        },
 458        {
 459                .handler        = NULL,
 460        }
 461};
 462
 463int __init smc_cdc_init(void)
 464{
 465        struct smc_wr_rx_handler *handler;
 466        int rc = 0;
 467
 468        for (handler = smc_cdc_rx_handlers; handler->handler; handler++) {
 469                INIT_HLIST_NODE(&handler->list);
 470                rc = smc_wr_rx_register_handler(handler);
 471                if (rc)
 472                        break;
 473        }
 474        return rc;
 475}
 476