linux/net/rxrpc/ar-ack.c
<<
>>
Prefs
   1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/module.h>
  13#include <linux/circ_buf.h>
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/slab.h>
  17#include <linux/udp.h>
  18#include <net/sock.h>
  19#include <net/af_rxrpc.h>
  20#include "ar-internal.h"
  21
  22static unsigned rxrpc_ack_defer = 1;
  23
  24static const char *const rxrpc_acks[] = {
  25        "---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
  26        "-?-"
  27};
  28
  29static const s8 rxrpc_ack_priority[] = {
  30        [0]                             = 0,
  31        [RXRPC_ACK_DELAY]               = 1,
  32        [RXRPC_ACK_REQUESTED]           = 2,
  33        [RXRPC_ACK_IDLE]                = 3,
  34        [RXRPC_ACK_PING_RESPONSE]       = 4,
  35        [RXRPC_ACK_DUPLICATE]           = 5,
  36        [RXRPC_ACK_OUT_OF_SEQUENCE]     = 6,
  37        [RXRPC_ACK_EXCEEDS_WINDOW]      = 7,
  38        [RXRPC_ACK_NOSPACE]             = 8,
  39};
  40
  41/*
  42 * propose an ACK be sent
  43 */
  44void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
  45                         __be32 serial, bool immediate)
  46{
  47        unsigned long expiry;
  48        s8 prior = rxrpc_ack_priority[ack_reason];
  49
  50        ASSERTCMP(prior, >, 0);
  51
  52        _enter("{%d},%s,%%%x,%u",
  53               call->debug_id, rxrpc_acks[ack_reason], ntohl(serial),
  54               immediate);
  55
  56        if (prior < rxrpc_ack_priority[call->ackr_reason]) {
  57                if (immediate)
  58                        goto cancel_timer;
  59                return;
  60        }
  61
  62        /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
  63         * numbers */
  64        if (prior == rxrpc_ack_priority[call->ackr_reason]) {
  65                if (prior <= 4)
  66                        call->ackr_serial = serial;
  67                if (immediate)
  68                        goto cancel_timer;
  69                return;
  70        }
  71
  72        call->ackr_reason = ack_reason;
  73        call->ackr_serial = serial;
  74
  75        switch (ack_reason) {
  76        case RXRPC_ACK_DELAY:
  77                _debug("run delay timer");
  78                call->ack_timer.expires = jiffies + rxrpc_ack_timeout * HZ;
  79                add_timer(&call->ack_timer);
  80                return;
  81
  82        case RXRPC_ACK_IDLE:
  83                if (!immediate) {
  84                        _debug("run defer timer");
  85                        expiry = 1;
  86                        goto run_timer;
  87                }
  88                goto cancel_timer;
  89
  90        case RXRPC_ACK_REQUESTED:
  91                if (!rxrpc_ack_defer)
  92                        goto cancel_timer;
  93                if (!immediate || serial == cpu_to_be32(1)) {
  94                        _debug("run defer timer");
  95                        expiry = rxrpc_ack_defer;
  96                        goto run_timer;
  97                }
  98
  99        default:
 100                _debug("immediate ACK");
 101                goto cancel_timer;
 102        }
 103
 104run_timer:
 105        expiry += jiffies;
 106        if (!timer_pending(&call->ack_timer) ||
 107            time_after(call->ack_timer.expires, expiry))
 108                mod_timer(&call->ack_timer, expiry);
 109        return;
 110
 111cancel_timer:
 112        _debug("cancel timer %%%u", ntohl(serial));
 113        try_to_del_timer_sync(&call->ack_timer);
 114        read_lock_bh(&call->state_lock);
 115        if (call->state <= RXRPC_CALL_COMPLETE &&
 116            !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
 117                rxrpc_queue_call(call);
 118        read_unlock_bh(&call->state_lock);
 119}
 120
 121/*
 122 * propose an ACK be sent, locking the call structure
 123 */
 124void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
 125                       __be32 serial, bool immediate)
 126{
 127        s8 prior = rxrpc_ack_priority[ack_reason];
 128
 129        if (prior > rxrpc_ack_priority[call->ackr_reason]) {
 130                spin_lock_bh(&call->lock);
 131                __rxrpc_propose_ACK(call, ack_reason, serial, immediate);
 132                spin_unlock_bh(&call->lock);
 133        }
 134}
 135
 136/*
 137 * set the resend timer
 138 */
 139static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
 140                             unsigned long resend_at)
 141{
 142        read_lock_bh(&call->state_lock);
 143        if (call->state >= RXRPC_CALL_COMPLETE)
 144                resend = 0;
 145
 146        if (resend & 1) {
 147                _debug("SET RESEND");
 148                set_bit(RXRPC_CALL_RESEND, &call->events);
 149        }
 150
 151        if (resend & 2) {
 152                _debug("MODIFY RESEND TIMER");
 153                set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 154                mod_timer(&call->resend_timer, resend_at);
 155        } else {
 156                _debug("KILL RESEND TIMER");
 157                del_timer_sync(&call->resend_timer);
 158                clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 159                clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 160        }
 161        read_unlock_bh(&call->state_lock);
 162}
 163
 164/*
 165 * resend packets
 166 */
 167static void rxrpc_resend(struct rxrpc_call *call)
 168{
 169        struct rxrpc_skb_priv *sp;
 170        struct rxrpc_header *hdr;
 171        struct sk_buff *txb;
 172        unsigned long *p_txb, resend_at;
 173        int loop, stop;
 174        u8 resend;
 175
 176        _enter("{%d,%d,%d,%d},",
 177               call->acks_hard, call->acks_unacked,
 178               atomic_read(&call->sequence),
 179               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 180
 181        stop = 0;
 182        resend = 0;
 183        resend_at = 0;
 184
 185        for (loop = call->acks_tail;
 186             loop != call->acks_head || stop;
 187             loop = (loop + 1) &  (call->acks_winsz - 1)
 188             ) {
 189                p_txb = call->acks_window + loop;
 190                smp_read_barrier_depends();
 191                if (*p_txb & 1)
 192                        continue;
 193
 194                txb = (struct sk_buff *) *p_txb;
 195                sp = rxrpc_skb(txb);
 196
 197                if (sp->need_resend) {
 198                        sp->need_resend = 0;
 199
 200                        /* each Tx packet has a new serial number */
 201                        sp->hdr.serial =
 202                                htonl(atomic_inc_return(&call->conn->serial));
 203
 204                        hdr = (struct rxrpc_header *) txb->head;
 205                        hdr->serial = sp->hdr.serial;
 206
 207                        _proto("Tx DATA %%%u { #%d }",
 208                               ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 209                        if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
 210                                stop = 0;
 211                                sp->resend_at = jiffies + 3;
 212                        } else {
 213                                sp->resend_at =
 214                                        jiffies + rxrpc_resend_timeout * HZ;
 215                        }
 216                }
 217
 218                if (time_after_eq(jiffies + 1, sp->resend_at)) {
 219                        sp->need_resend = 1;
 220                        resend |= 1;
 221                } else if (resend & 2) {
 222                        if (time_before(sp->resend_at, resend_at))
 223                                resend_at = sp->resend_at;
 224                } else {
 225                        resend_at = sp->resend_at;
 226                        resend |= 2;
 227                }
 228        }
 229
 230        rxrpc_set_resend(call, resend, resend_at);
 231        _leave("");
 232}
 233
 234/*
 235 * handle resend timer expiry
 236 */
 237static void rxrpc_resend_timer(struct rxrpc_call *call)
 238{
 239        struct rxrpc_skb_priv *sp;
 240        struct sk_buff *txb;
 241        unsigned long *p_txb, resend_at;
 242        int loop;
 243        u8 resend;
 244
 245        _enter("%d,%d,%d",
 246               call->acks_tail, call->acks_unacked, call->acks_head);
 247
 248        if (call->state >= RXRPC_CALL_COMPLETE)
 249                return;
 250
 251        resend = 0;
 252        resend_at = 0;
 253
 254        for (loop = call->acks_unacked;
 255             loop != call->acks_head;
 256             loop = (loop + 1) &  (call->acks_winsz - 1)
 257             ) {
 258                p_txb = call->acks_window + loop;
 259                smp_read_barrier_depends();
 260                txb = (struct sk_buff *) (*p_txb & ~1);
 261                sp = rxrpc_skb(txb);
 262
 263                ASSERT(!(*p_txb & 1));
 264
 265                if (sp->need_resend) {
 266                        ;
 267                } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 268                        sp->need_resend = 1;
 269                        resend |= 1;
 270                } else if (resend & 2) {
 271                        if (time_before(sp->resend_at, resend_at))
 272                                resend_at = sp->resend_at;
 273                } else {
 274                        resend_at = sp->resend_at;
 275                        resend |= 2;
 276                }
 277        }
 278
 279        rxrpc_set_resend(call, resend, resend_at);
 280        _leave("");
 281}
 282
 283/*
 284 * process soft ACKs of our transmitted packets
 285 * - these indicate packets the peer has or has not received, but hasn't yet
 286 *   given to the consumer, and so can still be discarded and re-requested
 287 */
 288static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
 289                                   struct rxrpc_ackpacket *ack,
 290                                   struct sk_buff *skb)
 291{
 292        struct rxrpc_skb_priv *sp;
 293        struct sk_buff *txb;
 294        unsigned long *p_txb, resend_at;
 295        int loop;
 296        u8 sacks[RXRPC_MAXACKS], resend;
 297
 298        _enter("{%d,%d},{%d},",
 299               call->acks_hard,
 300               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
 301               ack->nAcks);
 302
 303        if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
 304                goto protocol_error;
 305
 306        resend = 0;
 307        resend_at = 0;
 308        for (loop = 0; loop < ack->nAcks; loop++) {
 309                p_txb = call->acks_window;
 310                p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
 311                smp_read_barrier_depends();
 312                txb = (struct sk_buff *) (*p_txb & ~1);
 313                sp = rxrpc_skb(txb);
 314
 315                switch (sacks[loop]) {
 316                case RXRPC_ACK_TYPE_ACK:
 317                        sp->need_resend = 0;
 318                        *p_txb |= 1;
 319                        break;
 320                case RXRPC_ACK_TYPE_NACK:
 321                        sp->need_resend = 1;
 322                        *p_txb &= ~1;
 323                        resend = 1;
 324                        break;
 325                default:
 326                        _debug("Unsupported ACK type %d", sacks[loop]);
 327                        goto protocol_error;
 328                }
 329        }
 330
 331        smp_mb();
 332        call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
 333
 334        /* anything not explicitly ACK'd is implicitly NACK'd, but may just not
 335         * have been received or processed yet by the far end */
 336        for (loop = call->acks_unacked;
 337             loop != call->acks_head;
 338             loop = (loop + 1) &  (call->acks_winsz - 1)
 339             ) {
 340                p_txb = call->acks_window + loop;
 341                smp_read_barrier_depends();
 342                txb = (struct sk_buff *) (*p_txb & ~1);
 343                sp = rxrpc_skb(txb);
 344
 345                if (*p_txb & 1) {
 346                        /* packet must have been discarded */
 347                        sp->need_resend = 1;
 348                        *p_txb &= ~1;
 349                        resend |= 1;
 350                } else if (sp->need_resend) {
 351                        ;
 352                } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 353                        sp->need_resend = 1;
 354                        resend |= 1;
 355                } else if (resend & 2) {
 356                        if (time_before(sp->resend_at, resend_at))
 357                                resend_at = sp->resend_at;
 358                } else {
 359                        resend_at = sp->resend_at;
 360                        resend |= 2;
 361                }
 362        }
 363
 364        rxrpc_set_resend(call, resend, resend_at);
 365        _leave(" = 0");
 366        return 0;
 367
 368protocol_error:
 369        _leave(" = -EPROTO");
 370        return -EPROTO;
 371}
 372
 373/*
 374 * discard hard-ACK'd packets from the Tx window
 375 */
 376static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
 377{
 378        struct rxrpc_skb_priv *sp;
 379        unsigned long _skb;
 380        int tail = call->acks_tail, old_tail;
 381        int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
 382
 383        _enter("{%u,%u},%u", call->acks_hard, win, hard);
 384
 385        ASSERTCMP(hard - call->acks_hard, <=, win);
 386
 387        while (call->acks_hard < hard) {
 388                smp_read_barrier_depends();
 389                _skb = call->acks_window[tail] & ~1;
 390                sp = rxrpc_skb((struct sk_buff *) _skb);
 391                rxrpc_free_skb((struct sk_buff *) _skb);
 392                old_tail = tail;
 393                tail = (tail + 1) & (call->acks_winsz - 1);
 394                call->acks_tail = tail;
 395                if (call->acks_unacked == old_tail)
 396                        call->acks_unacked = tail;
 397                call->acks_hard++;
 398        }
 399
 400        wake_up(&call->tx_waitq);
 401}
 402
 403/*
 404 * clear the Tx window in the event of a failure
 405 */
 406static void rxrpc_clear_tx_window(struct rxrpc_call *call)
 407{
 408        rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
 409}
 410
 411/*
 412 * drain the out of sequence received packet queue into the packet Rx queue
 413 */
 414static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
 415{
 416        struct rxrpc_skb_priv *sp;
 417        struct sk_buff *skb;
 418        bool terminal;
 419        int ret;
 420
 421        _enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
 422
 423        spin_lock_bh(&call->lock);
 424
 425        ret = -ECONNRESET;
 426        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 427                goto socket_unavailable;
 428
 429        skb = skb_dequeue(&call->rx_oos_queue);
 430        if (skb) {
 431                sp = rxrpc_skb(skb);
 432
 433                _debug("drain OOS packet %d [%d]",
 434                       ntohl(sp->hdr.seq), call->rx_first_oos);
 435
 436                if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
 437                        skb_queue_head(&call->rx_oos_queue, skb);
 438                        call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
 439                        _debug("requeue %p {%u}", skb, call->rx_first_oos);
 440                } else {
 441                        skb->mark = RXRPC_SKB_MARK_DATA;
 442                        terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
 443                                !(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
 444                        ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
 445                        BUG_ON(ret < 0);
 446                        _debug("drain #%u", call->rx_data_post);
 447                        call->rx_data_post++;
 448
 449                        /* find out what the next packet is */
 450                        skb = skb_peek(&call->rx_oos_queue);
 451                        if (skb)
 452                                call->rx_first_oos =
 453                                        ntohl(rxrpc_skb(skb)->hdr.seq);
 454                        else
 455                                call->rx_first_oos = 0;
 456                        _debug("peek %p {%u}", skb, call->rx_first_oos);
 457                }
 458        }
 459
 460        ret = 0;
 461socket_unavailable:
 462        spin_unlock_bh(&call->lock);
 463        _leave(" = %d", ret);
 464        return ret;
 465}
 466
 467/*
 468 * insert an out of sequence packet into the buffer
 469 */
 470static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
 471                                    struct sk_buff *skb)
 472{
 473        struct rxrpc_skb_priv *sp, *psp;
 474        struct sk_buff *p;
 475        u32 seq;
 476
 477        sp = rxrpc_skb(skb);
 478        seq = ntohl(sp->hdr.seq);
 479        _enter(",,{%u}", seq);
 480
 481        skb->destructor = rxrpc_packet_destructor;
 482        ASSERTCMP(sp->call, ==, NULL);
 483        sp->call = call;
 484        rxrpc_get_call(call);
 485
 486        /* insert into the buffer in sequence order */
 487        spin_lock_bh(&call->lock);
 488
 489        skb_queue_walk(&call->rx_oos_queue, p) {
 490                psp = rxrpc_skb(p);
 491                if (ntohl(psp->hdr.seq) > seq) {
 492                        _debug("insert oos #%u before #%u",
 493                               seq, ntohl(psp->hdr.seq));
 494                        skb_insert(p, skb, &call->rx_oos_queue);
 495                        goto inserted;
 496                }
 497        }
 498
 499        _debug("append oos #%u", seq);
 500        skb_queue_tail(&call->rx_oos_queue, skb);
 501inserted:
 502
 503        /* we might now have a new front to the queue */
 504        if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
 505                call->rx_first_oos = seq;
 506
 507        read_lock(&call->state_lock);
 508        if (call->state < RXRPC_CALL_COMPLETE &&
 509            call->rx_data_post == call->rx_first_oos) {
 510                _debug("drain rx oos now");
 511                set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
 512        }
 513        read_unlock(&call->state_lock);
 514
 515        spin_unlock_bh(&call->lock);
 516        _leave(" [stored #%u]", call->rx_first_oos);
 517}
 518
 519/*
 520 * clear the Tx window on final ACK reception
 521 */
 522static void rxrpc_zap_tx_window(struct rxrpc_call *call)
 523{
 524        struct rxrpc_skb_priv *sp;
 525        struct sk_buff *skb;
 526        unsigned long _skb, *acks_window;
 527        u8 winsz = call->acks_winsz;
 528        int tail;
 529
 530        acks_window = call->acks_window;
 531        call->acks_window = NULL;
 532
 533        while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
 534                tail = call->acks_tail;
 535                smp_read_barrier_depends();
 536                _skb = acks_window[tail] & ~1;
 537                smp_mb();
 538                call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
 539
 540                skb = (struct sk_buff *) _skb;
 541                sp = rxrpc_skb(skb);
 542                _debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
 543                rxrpc_free_skb(skb);
 544        }
 545
 546        kfree(acks_window);
 547}
 548
 549/*
 550 * process the extra information that may be appended to an ACK packet
 551 */
 552static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
 553                                  unsigned latest, int nAcks)
 554{
 555        struct rxrpc_ackinfo ackinfo;
 556        struct rxrpc_peer *peer;
 557        unsigned mtu;
 558
 559        if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
 560                _leave(" [no ackinfo]");
 561                return;
 562        }
 563
 564        _proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
 565               latest,
 566               ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
 567               ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
 568
 569        mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
 570
 571        peer = call->conn->trans->peer;
 572        if (mtu < peer->maxdata) {
 573                spin_lock_bh(&peer->lock);
 574                peer->maxdata = mtu;
 575                peer->mtu = mtu + peer->hdrsize;
 576                spin_unlock_bh(&peer->lock);
 577                _net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
 578        }
 579}
 580
 581/*
 582 * process packets in the reception queue
 583 */
 584static int rxrpc_process_rx_queue(struct rxrpc_call *call,
 585                                  u32 *_abort_code)
 586{
 587        struct rxrpc_ackpacket ack;
 588        struct rxrpc_skb_priv *sp;
 589        struct sk_buff *skb;
 590        bool post_ACK;
 591        int latest;
 592        u32 hard, tx;
 593
 594        _enter("");
 595
 596process_further:
 597        skb = skb_dequeue(&call->rx_queue);
 598        if (!skb)
 599                return -EAGAIN;
 600
 601        _net("deferred skb %p", skb);
 602
 603        sp = rxrpc_skb(skb);
 604
 605        _debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
 606
 607        post_ACK = false;
 608
 609        switch (sp->hdr.type) {
 610                /* data packets that wind up here have been received out of
 611                 * order, need security processing or are jumbo packets */
 612        case RXRPC_PACKET_TYPE_DATA:
 613                _proto("OOSQ DATA %%%u { #%u }",
 614                       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 615
 616                /* secured packets must be verified and possibly decrypted */
 617                if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
 618                        goto protocol_error;
 619
 620                rxrpc_insert_oos_packet(call, skb);
 621                goto process_further;
 622
 623                /* partial ACK to process */
 624        case RXRPC_PACKET_TYPE_ACK:
 625                if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
 626                        _debug("extraction failure");
 627                        goto protocol_error;
 628                }
 629                if (!skb_pull(skb, sizeof(ack)))
 630                        BUG();
 631
 632                latest = ntohl(sp->hdr.serial);
 633                hard = ntohl(ack.firstPacket);
 634                tx = atomic_read(&call->sequence);
 635
 636                _proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
 637                       latest,
 638                       ntohs(ack.maxSkew),
 639                       hard,
 640                       ntohl(ack.previousPacket),
 641                       ntohl(ack.serial),
 642                       rxrpc_acks[ack.reason],
 643                       ack.nAcks);
 644
 645                rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
 646
 647                if (ack.reason == RXRPC_ACK_PING) {
 648                        _proto("Rx ACK %%%u PING Request", latest);
 649                        rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
 650                                          sp->hdr.serial, true);
 651                }
 652
 653                /* discard any out-of-order or duplicate ACKs */
 654                if (latest - call->acks_latest <= 0) {
 655                        _debug("discard ACK %d <= %d",
 656                               latest, call->acks_latest);
 657                        goto discard;
 658                }
 659                call->acks_latest = latest;
 660
 661                if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
 662                    call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
 663                    call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
 664                    call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
 665                        goto discard;
 666
 667                _debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
 668
 669                if (hard > 0) {
 670                        if (hard - 1 > tx) {
 671                                _debug("hard-ACK'd packet %d not transmitted"
 672                                       " (%d top)",
 673                                       hard - 1, tx);
 674                                goto protocol_error;
 675                        }
 676
 677                        if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
 678                             call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
 679                            hard > tx)
 680                                goto all_acked;
 681
 682                        smp_rmb();
 683                        rxrpc_rotate_tx_window(call, hard - 1);
 684                }
 685
 686                if (ack.nAcks > 0) {
 687                        if (hard - 1 + ack.nAcks > tx) {
 688                                _debug("soft-ACK'd packet %d+%d not"
 689                                       " transmitted (%d top)",
 690                                       hard - 1, ack.nAcks, tx);
 691                                goto protocol_error;
 692                        }
 693
 694                        if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
 695                                goto protocol_error;
 696                }
 697                goto discard;
 698
 699                /* complete ACK to process */
 700        case RXRPC_PACKET_TYPE_ACKALL:
 701                goto all_acked;
 702
 703                /* abort and busy are handled elsewhere */
 704        case RXRPC_PACKET_TYPE_BUSY:
 705        case RXRPC_PACKET_TYPE_ABORT:
 706                BUG();
 707
 708                /* connection level events - also handled elsewhere */
 709        case RXRPC_PACKET_TYPE_CHALLENGE:
 710        case RXRPC_PACKET_TYPE_RESPONSE:
 711        case RXRPC_PACKET_TYPE_DEBUG:
 712                BUG();
 713        }
 714
 715        /* if we've had a hard ACK that covers all the packets we've sent, then
 716         * that ends that phase of the operation */
 717all_acked:
 718        write_lock_bh(&call->state_lock);
 719        _debug("ack all %d", call->state);
 720
 721        switch (call->state) {
 722        case RXRPC_CALL_CLIENT_AWAIT_REPLY:
 723                call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
 724                break;
 725        case RXRPC_CALL_SERVER_AWAIT_ACK:
 726                _debug("srv complete");
 727                call->state = RXRPC_CALL_COMPLETE;
 728                post_ACK = true;
 729                break;
 730        case RXRPC_CALL_CLIENT_SEND_REQUEST:
 731        case RXRPC_CALL_SERVER_RECV_REQUEST:
 732                goto protocol_error_unlock; /* can't occur yet */
 733        default:
 734                write_unlock_bh(&call->state_lock);
 735                goto discard; /* assume packet left over from earlier phase */
 736        }
 737
 738        write_unlock_bh(&call->state_lock);
 739
 740        /* if all the packets we sent are hard-ACK'd, then we can discard
 741         * whatever we've got left */
 742        _debug("clear Tx %d",
 743               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 744
 745        del_timer_sync(&call->resend_timer);
 746        clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 747        clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 748
 749        if (call->acks_window)
 750                rxrpc_zap_tx_window(call);
 751
 752        if (post_ACK) {
 753                /* post the final ACK message for userspace to pick up */
 754                _debug("post ACK");
 755                skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
 756                sp->call = call;
 757                rxrpc_get_call(call);
 758                spin_lock_bh(&call->lock);
 759                if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
 760                        BUG();
 761                spin_unlock_bh(&call->lock);
 762                goto process_further;
 763        }
 764
 765discard:
 766        rxrpc_free_skb(skb);
 767        goto process_further;
 768
 769protocol_error_unlock:
 770        write_unlock_bh(&call->state_lock);
 771protocol_error:
 772        rxrpc_free_skb(skb);
 773        _leave(" = -EPROTO");
 774        return -EPROTO;
 775}
 776
 777/*
 778 * post a message to the socket Rx queue for recvmsg() to pick up
 779 */
 780static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
 781                              bool fatal)
 782{
 783        struct rxrpc_skb_priv *sp;
 784        struct sk_buff *skb;
 785        int ret;
 786
 787        _enter("{%d,%lx},%u,%u,%d",
 788               call->debug_id, call->flags, mark, error, fatal);
 789
 790        /* remove timers and things for fatal messages */
 791        if (fatal) {
 792                del_timer_sync(&call->resend_timer);
 793                del_timer_sync(&call->ack_timer);
 794                clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 795        }
 796
 797        if (mark != RXRPC_SKB_MARK_NEW_CALL &&
 798            !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 799                _leave("[no userid]");
 800                return 0;
 801        }
 802
 803        if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
 804                skb = alloc_skb(0, GFP_NOFS);
 805                if (!skb)
 806                        return -ENOMEM;
 807
 808                rxrpc_new_skb(skb);
 809
 810                skb->mark = mark;
 811
 812                sp = rxrpc_skb(skb);
 813                memset(sp, 0, sizeof(*sp));
 814                sp->error = error;
 815                sp->call = call;
 816                rxrpc_get_call(call);
 817
 818                spin_lock_bh(&call->lock);
 819                ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
 820                spin_unlock_bh(&call->lock);
 821                BUG_ON(ret < 0);
 822        }
 823
 824        return 0;
 825}
 826
 827/*
 828 * handle background processing of incoming call packets and ACK / abort
 829 * generation
 830 */
 831void rxrpc_process_call(struct work_struct *work)
 832{
 833        struct rxrpc_call *call =
 834                container_of(work, struct rxrpc_call, processor);
 835        struct rxrpc_ackpacket ack;
 836        struct rxrpc_ackinfo ackinfo;
 837        struct rxrpc_header hdr;
 838        struct msghdr msg;
 839        struct kvec iov[5];
 840        unsigned long bits;
 841        __be32 data, pad;
 842        size_t len;
 843        int genbit, loop, nbit, ioc, ret, mtu;
 844        u32 abort_code = RX_PROTOCOL_ERROR;
 845        u8 *acks = NULL;
 846
 847        //printk("\n--------------------\n");
 848        _enter("{%d,%s,%lx} [%lu]",
 849               call->debug_id, rxrpc_call_states[call->state], call->events,
 850               (jiffies - call->creation_jif) / (HZ / 10));
 851
 852        if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
 853                _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
 854                return;
 855        }
 856
 857        /* there's a good chance we're going to have to send a message, so set
 858         * one up in advance */
 859        msg.msg_name    = &call->conn->trans->peer->srx.transport.sin;
 860        msg.msg_namelen = sizeof(call->conn->trans->peer->srx.transport.sin);
 861        msg.msg_control = NULL;
 862        msg.msg_controllen = 0;
 863        msg.msg_flags   = 0;
 864
 865        hdr.epoch       = call->conn->epoch;
 866        hdr.cid         = call->cid;
 867        hdr.callNumber  = call->call_id;
 868        hdr.seq         = 0;
 869        hdr.type        = RXRPC_PACKET_TYPE_ACK;
 870        hdr.flags       = call->conn->out_clientflag;
 871        hdr.userStatus  = 0;
 872        hdr.securityIndex = call->conn->security_ix;
 873        hdr._rsvd       = 0;
 874        hdr.serviceId   = call->conn->service_id;
 875
 876        memset(iov, 0, sizeof(iov));
 877        iov[0].iov_base = &hdr;
 878        iov[0].iov_len  = sizeof(hdr);
 879
 880        /* deal with events of a final nature */
 881        if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
 882                rxrpc_release_call(call);
 883                clear_bit(RXRPC_CALL_RELEASE, &call->events);
 884        }
 885
 886        if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
 887                int error;
 888
 889                clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 890                clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 891                clear_bit(RXRPC_CALL_ABORT, &call->events);
 892
 893                error = call->conn->trans->peer->net_error;
 894                _debug("post net error %d", error);
 895
 896                if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
 897                                       error, true) < 0)
 898                        goto no_mem;
 899                clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
 900                goto kill_ACKs;
 901        }
 902
 903        if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
 904                ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 905
 906                clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 907                clear_bit(RXRPC_CALL_ABORT, &call->events);
 908
 909                _debug("post conn abort");
 910
 911                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 912                                       call->conn->error, true) < 0)
 913                        goto no_mem;
 914                clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 915                goto kill_ACKs;
 916        }
 917
 918        if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
 919                hdr.type = RXRPC_PACKET_TYPE_BUSY;
 920                genbit = RXRPC_CALL_REJECT_BUSY;
 921                goto send_message;
 922        }
 923
 924        if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
 925                ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 926
 927                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 928                                       ECONNABORTED, true) < 0)
 929                        goto no_mem;
 930                hdr.type = RXRPC_PACKET_TYPE_ABORT;
 931                data = htonl(call->abort_code);
 932                iov[1].iov_base = &data;
 933                iov[1].iov_len = sizeof(data);
 934                genbit = RXRPC_CALL_ABORT;
 935                goto send_message;
 936        }
 937
 938        if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
 939                genbit = RXRPC_CALL_ACK_FINAL;
 940
 941                ack.bufferSpace = htons(8);
 942                ack.maxSkew     = 0;
 943                ack.serial      = 0;
 944                ack.reason      = RXRPC_ACK_IDLE;
 945                ack.nAcks       = 0;
 946                call->ackr_reason = 0;
 947
 948                spin_lock_bh(&call->lock);
 949                ack.serial = call->ackr_serial;
 950                ack.previousPacket = call->ackr_prev_seq;
 951                ack.firstPacket = htonl(call->rx_data_eaten + 1);
 952                spin_unlock_bh(&call->lock);
 953
 954                pad = 0;
 955
 956                iov[1].iov_base = &ack;
 957                iov[1].iov_len  = sizeof(ack);
 958                iov[2].iov_base = &pad;
 959                iov[2].iov_len  = 3;
 960                iov[3].iov_base = &ackinfo;
 961                iov[3].iov_len  = sizeof(ackinfo);
 962                goto send_ACK;
 963        }
 964
 965        if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
 966                            (1 << RXRPC_CALL_RCVD_ABORT))
 967            ) {
 968                u32 mark;
 969
 970                if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
 971                        mark = RXRPC_SKB_MARK_REMOTE_ABORT;
 972                else
 973                        mark = RXRPC_SKB_MARK_BUSY;
 974
 975                _debug("post abort/busy");
 976                rxrpc_clear_tx_window(call);
 977                if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
 978                        goto no_mem;
 979
 980                clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
 981                clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
 982                goto kill_ACKs;
 983        }
 984
 985        if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
 986                _debug("do implicit ackall");
 987                rxrpc_clear_tx_window(call);
 988        }
 989
 990        if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
 991                write_lock_bh(&call->state_lock);
 992                if (call->state <= RXRPC_CALL_COMPLETE) {
 993                        call->state = RXRPC_CALL_LOCALLY_ABORTED;
 994                        call->abort_code = RX_CALL_TIMEOUT;
 995                        set_bit(RXRPC_CALL_ABORT, &call->events);
 996                }
 997                write_unlock_bh(&call->state_lock);
 998
 999                _debug("post timeout");
1000                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
1001                                       ETIME, true) < 0)
1002                        goto no_mem;
1003
1004                clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
1005                goto kill_ACKs;
1006        }
1007
1008        /* deal with assorted inbound messages */
1009        if (!skb_queue_empty(&call->rx_queue)) {
1010                switch (rxrpc_process_rx_queue(call, &abort_code)) {
1011                case 0:
1012                case -EAGAIN:
1013                        break;
1014                case -ENOMEM:
1015                        goto no_mem;
1016                case -EKEYEXPIRED:
1017                case -EKEYREJECTED:
1018                case -EPROTO:
1019                        rxrpc_abort_call(call, abort_code);
1020                        goto kill_ACKs;
1021                }
1022        }
1023
1024        /* handle resending */
1025        if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
1026                rxrpc_resend_timer(call);
1027        if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
1028                rxrpc_resend(call);
1029
1030        /* consider sending an ordinary ACK */
1031        if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1032                _debug("send ACK: window: %d - %d { %lx }",
1033                       call->rx_data_eaten, call->ackr_win_top,
1034                       call->ackr_window[0]);
1035
1036                if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1037                    call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
1038                        /* ACK by sending reply DATA packet in this state */
1039                        clear_bit(RXRPC_CALL_ACK, &call->events);
1040                        goto maybe_reschedule;
1041                }
1042
1043                genbit = RXRPC_CALL_ACK;
1044
1045                acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1046                               GFP_NOFS);
1047                if (!acks)
1048                        goto no_mem;
1049
1050                //hdr.flags     = RXRPC_SLOW_START_OK;
1051                ack.bufferSpace = htons(8);
1052                ack.maxSkew     = 0;
1053                ack.serial      = 0;
1054                ack.reason      = 0;
1055
1056                spin_lock_bh(&call->lock);
1057                ack.reason = call->ackr_reason;
1058                ack.serial = call->ackr_serial;
1059                ack.previousPacket = call->ackr_prev_seq;
1060                ack.firstPacket = htonl(call->rx_data_eaten + 1);
1061
1062                ack.nAcks = 0;
1063                for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1064                        nbit = loop * BITS_PER_LONG;
1065                        for (bits = call->ackr_window[loop]; bits; bits >>= 1
1066                             ) {
1067                                _debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1068                                if (bits & 1) {
1069                                        acks[nbit] = RXRPC_ACK_TYPE_ACK;
1070                                        ack.nAcks = nbit + 1;
1071                                }
1072                                nbit++;
1073                        }
1074                }
1075                call->ackr_reason = 0;
1076                spin_unlock_bh(&call->lock);
1077
1078                pad = 0;
1079
1080                iov[1].iov_base = &ack;
1081                iov[1].iov_len  = sizeof(ack);
1082                iov[2].iov_base = acks;
1083                iov[2].iov_len  = ack.nAcks;
1084                iov[3].iov_base = &pad;
1085                iov[3].iov_len  = 3;
1086                iov[4].iov_base = &ackinfo;
1087                iov[4].iov_len  = sizeof(ackinfo);
1088
1089                switch (ack.reason) {
1090                case RXRPC_ACK_REQUESTED:
1091                case RXRPC_ACK_DUPLICATE:
1092                case RXRPC_ACK_OUT_OF_SEQUENCE:
1093                case RXRPC_ACK_EXCEEDS_WINDOW:
1094                case RXRPC_ACK_NOSPACE:
1095                case RXRPC_ACK_PING:
1096                case RXRPC_ACK_PING_RESPONSE:
1097                        goto send_ACK_with_skew;
1098                case RXRPC_ACK_DELAY:
1099                case RXRPC_ACK_IDLE:
1100                        goto send_ACK;
1101                }
1102        }
1103
1104        /* handle completion of security negotiations on an incoming
1105         * connection */
1106        if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
1107                _debug("secured");
1108                spin_lock_bh(&call->lock);
1109
1110                if (call->state == RXRPC_CALL_SERVER_SECURING) {
1111                        _debug("securing");
1112                        write_lock(&call->conn->lock);
1113                        if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1114                            !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1115                                _debug("not released");
1116                                call->state = RXRPC_CALL_SERVER_ACCEPTING;
1117                                list_move_tail(&call->accept_link,
1118                                               &call->socket->acceptq);
1119                        }
1120                        write_unlock(&call->conn->lock);
1121                        read_lock(&call->state_lock);
1122                        if (call->state < RXRPC_CALL_COMPLETE)
1123                                set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1124                        read_unlock(&call->state_lock);
1125                }
1126
1127                spin_unlock_bh(&call->lock);
1128                if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1129                        goto maybe_reschedule;
1130        }
1131
1132        /* post a notification of an acceptable connection to the app */
1133        if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1134                _debug("post accept");
1135                if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1136                                       0, false) < 0)
1137                        goto no_mem;
1138                clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1139                goto maybe_reschedule;
1140        }
1141
1142        /* handle incoming call acceptance */
1143        if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
1144                _debug("accepted");
1145                ASSERTCMP(call->rx_data_post, ==, 0);
1146                call->rx_data_post = 1;
1147                read_lock_bh(&call->state_lock);
1148                if (call->state < RXRPC_CALL_COMPLETE)
1149                        set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
1150                read_unlock_bh(&call->state_lock);
1151        }
1152
1153        /* drain the out of sequence received packet queue into the packet Rx
1154         * queue */
1155        if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
1156                while (call->rx_data_post == call->rx_first_oos)
1157                        if (rxrpc_drain_rx_oos_queue(call) < 0)
1158                                break;
1159                goto maybe_reschedule;
1160        }
1161
1162        /* other events may have been raised since we started checking */
1163        goto maybe_reschedule;
1164
1165send_ACK_with_skew:
1166        ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1167                            ntohl(ack.serial));
1168send_ACK:
1169        mtu = call->conn->trans->peer->if_mtu;
1170        mtu -= call->conn->trans->peer->hdrsize;
1171        ackinfo.maxMTU  = htonl(mtu);
1172        ackinfo.rwind   = htonl(32);
1173
1174        /* permit the peer to send us jumbo packets if it wants to */
1175        ackinfo.rxMTU   = htonl(5692);
1176        ackinfo.jumbo_max = htonl(4);
1177
1178        hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1179        _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1180               ntohl(hdr.serial),
1181               ntohs(ack.maxSkew),
1182               ntohl(ack.firstPacket),
1183               ntohl(ack.previousPacket),
1184               ntohl(ack.serial),
1185               rxrpc_acks[ack.reason],
1186               ack.nAcks);
1187
1188        del_timer_sync(&call->ack_timer);
1189        if (ack.nAcks > 0)
1190                set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
1191        goto send_message_2;
1192
1193send_message:
1194        _debug("send message");
1195
1196        hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1197        _proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1198send_message_2:
1199
1200        len = iov[0].iov_len;
1201        ioc = 1;
1202        if (iov[4].iov_len) {
1203                ioc = 5;
1204                len += iov[4].iov_len;
1205                len += iov[3].iov_len;
1206                len += iov[2].iov_len;
1207                len += iov[1].iov_len;
1208        } else if (iov[3].iov_len) {
1209                ioc = 4;
1210                len += iov[3].iov_len;
1211                len += iov[2].iov_len;
1212                len += iov[1].iov_len;
1213        } else if (iov[2].iov_len) {
1214                ioc = 3;
1215                len += iov[2].iov_len;
1216                len += iov[1].iov_len;
1217        } else if (iov[1].iov_len) {
1218                ioc = 2;
1219                len += iov[1].iov_len;
1220        }
1221
1222        ret = kernel_sendmsg(call->conn->trans->local->socket,
1223                             &msg, iov, ioc, len);
1224        if (ret < 0) {
1225                _debug("sendmsg failed: %d", ret);
1226                read_lock_bh(&call->state_lock);
1227                if (call->state < RXRPC_CALL_DEAD)
1228                        rxrpc_queue_call(call);
1229                read_unlock_bh(&call->state_lock);
1230                goto error;
1231        }
1232
1233        switch (genbit) {
1234        case RXRPC_CALL_ABORT:
1235                clear_bit(genbit, &call->events);
1236                clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1237                goto kill_ACKs;
1238
1239        case RXRPC_CALL_ACK_FINAL:
1240                write_lock_bh(&call->state_lock);
1241                if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1242                        call->state = RXRPC_CALL_COMPLETE;
1243                write_unlock_bh(&call->state_lock);
1244                goto kill_ACKs;
1245
1246        default:
1247                clear_bit(genbit, &call->events);
1248                switch (call->state) {
1249                case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1250                case RXRPC_CALL_CLIENT_RECV_REPLY:
1251                case RXRPC_CALL_SERVER_RECV_REQUEST:
1252                case RXRPC_CALL_SERVER_ACK_REQUEST:
1253                        _debug("start ACK timer");
1254                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
1255                                          call->ackr_serial, false);
1256                default:
1257                        break;
1258                }
1259                goto maybe_reschedule;
1260        }
1261
1262kill_ACKs:
1263        del_timer_sync(&call->ack_timer);
1264        if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
1265                rxrpc_put_call(call);
1266        clear_bit(RXRPC_CALL_ACK, &call->events);
1267
1268maybe_reschedule:
1269        if (call->events || !skb_queue_empty(&call->rx_queue)) {
1270                read_lock_bh(&call->state_lock);
1271                if (call->state < RXRPC_CALL_DEAD)
1272                        rxrpc_queue_call(call);
1273                read_unlock_bh(&call->state_lock);
1274        }
1275
1276        /* don't leave aborted connections on the accept queue */
1277        if (call->state >= RXRPC_CALL_COMPLETE &&
1278            !list_empty(&call->accept_link)) {
1279                _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1280                       call, call->events, call->flags,
1281                       ntohl(call->conn->cid));
1282
1283                read_lock_bh(&call->state_lock);
1284                if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1285                    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1286                        rxrpc_queue_call(call);
1287                read_unlock_bh(&call->state_lock);
1288        }
1289
1290error:
1291        clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
1292        kfree(acks);
1293
1294        /* because we don't want two CPUs both processing the work item for one
1295         * call at the same time, we use a flag to note when it's busy; however
1296         * this means there's a race between clearing the flag and setting the
1297         * work pending bit and the work item being processed again */
1298        if (call->events && !work_pending(&call->processor)) {
1299                _debug("jumpstart %x", ntohl(call->conn->cid));
1300                rxrpc_queue_call(call);
1301        }
1302
1303        _leave("");
1304        return;
1305
1306no_mem:
1307        _debug("out of memory");
1308        goto maybe_reschedule;
1309}
1310