linux/net/rxrpc/ar-ack.c
<<
>>
Prefs
   1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/module.h>
  13#include <linux/circ_buf.h>
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/slab.h>
  17#include <linux/udp.h>
  18#include <net/sock.h>
  19#include <net/af_rxrpc.h>
  20#include "ar-internal.h"
  21
  22static unsigned rxrpc_ack_defer = 1;
  23
  24static const char *const rxrpc_acks[] = {
  25        "---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
  26        "-?-"
  27};
  28
  29static const s8 rxrpc_ack_priority[] = {
  30        [0]                             = 0,
  31        [RXRPC_ACK_DELAY]               = 1,
  32        [RXRPC_ACK_REQUESTED]           = 2,
  33        [RXRPC_ACK_IDLE]                = 3,
  34        [RXRPC_ACK_PING_RESPONSE]       = 4,
  35        [RXRPC_ACK_DUPLICATE]           = 5,
  36        [RXRPC_ACK_OUT_OF_SEQUENCE]     = 6,
  37        [RXRPC_ACK_EXCEEDS_WINDOW]      = 7,
  38        [RXRPC_ACK_NOSPACE]             = 8,
  39};
  40
  41/*
  42 * propose an ACK be sent
  43 */
  44void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
  45                         __be32 serial, bool immediate)
  46{
  47        unsigned long expiry;
  48        s8 prior = rxrpc_ack_priority[ack_reason];
  49
  50        ASSERTCMP(prior, >, 0);
  51
  52        _enter("{%d},%s,%%%x,%u",
  53               call->debug_id, rxrpc_acks[ack_reason], ntohl(serial),
  54               immediate);
  55
  56        if (prior < rxrpc_ack_priority[call->ackr_reason]) {
  57                if (immediate)
  58                        goto cancel_timer;
  59                return;
  60        }
  61
  62        /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
  63         * numbers */
  64        if (prior == rxrpc_ack_priority[call->ackr_reason]) {
  65                if (prior <= 4)
  66                        call->ackr_serial = serial;
  67                if (immediate)
  68                        goto cancel_timer;
  69                return;
  70        }
  71
  72        call->ackr_reason = ack_reason;
  73        call->ackr_serial = serial;
  74
  75        switch (ack_reason) {
  76        case RXRPC_ACK_DELAY:
  77                _debug("run delay timer");
  78                call->ack_timer.expires = jiffies + rxrpc_ack_timeout * HZ;
  79                add_timer(&call->ack_timer);
  80                return;
  81
  82        case RXRPC_ACK_IDLE:
  83                if (!immediate) {
  84                        _debug("run defer timer");
  85                        expiry = 1;
  86                        goto run_timer;
  87                }
  88                goto cancel_timer;
  89
  90        case RXRPC_ACK_REQUESTED:
  91                if (!rxrpc_ack_defer)
  92                        goto cancel_timer;
  93                if (!immediate || serial == cpu_to_be32(1)) {
  94                        _debug("run defer timer");
  95                        expiry = rxrpc_ack_defer;
  96                        goto run_timer;
  97                }
  98
  99        default:
 100                _debug("immediate ACK");
 101                goto cancel_timer;
 102        }
 103
 104run_timer:
 105        expiry += jiffies;
 106        if (!timer_pending(&call->ack_timer) ||
 107            time_after(call->ack_timer.expires, expiry))
 108                mod_timer(&call->ack_timer, expiry);
 109        return;
 110
 111cancel_timer:
 112        _debug("cancel timer %%%u", ntohl(serial));
 113        try_to_del_timer_sync(&call->ack_timer);
 114        read_lock_bh(&call->state_lock);
 115        if (call->state <= RXRPC_CALL_COMPLETE &&
 116            !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
 117                rxrpc_queue_call(call);
 118        read_unlock_bh(&call->state_lock);
 119}
 120
 121/*
 122 * propose an ACK be sent, locking the call structure
 123 */
 124void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
 125                       __be32 serial, bool immediate)
 126{
 127        s8 prior = rxrpc_ack_priority[ack_reason];
 128
 129        if (prior > rxrpc_ack_priority[call->ackr_reason]) {
 130                spin_lock_bh(&call->lock);
 131                __rxrpc_propose_ACK(call, ack_reason, serial, immediate);
 132                spin_unlock_bh(&call->lock);
 133        }
 134}
 135
 136/*
 137 * set the resend timer
 138 */
 139static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
 140                             unsigned long resend_at)
 141{
 142        read_lock_bh(&call->state_lock);
 143        if (call->state >= RXRPC_CALL_COMPLETE)
 144                resend = 0;
 145
 146        if (resend & 1) {
 147                _debug("SET RESEND");
 148                set_bit(RXRPC_CALL_RESEND, &call->events);
 149        }
 150
 151        if (resend & 2) {
 152                _debug("MODIFY RESEND TIMER");
 153                set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 154                mod_timer(&call->resend_timer, resend_at);
 155        } else {
 156                _debug("KILL RESEND TIMER");
 157                del_timer_sync(&call->resend_timer);
 158                clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 159                clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 160        }
 161        read_unlock_bh(&call->state_lock);
 162}
 163
 164/*
 165 * resend packets
 166 */
 167static void rxrpc_resend(struct rxrpc_call *call)
 168{
 169        struct rxrpc_skb_priv *sp;
 170        struct rxrpc_header *hdr;
 171        struct sk_buff *txb;
 172        unsigned long *p_txb, resend_at;
 173        int loop, stop;
 174        u8 resend;
 175
 176        _enter("{%d,%d,%d,%d},",
 177               call->acks_hard, call->acks_unacked,
 178               atomic_read(&call->sequence),
 179               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 180
 181        stop = 0;
 182        resend = 0;
 183        resend_at = 0;
 184
 185        for (loop = call->acks_tail;
 186             loop != call->acks_head || stop;
 187             loop = (loop + 1) &  (call->acks_winsz - 1)
 188             ) {
 189                p_txb = call->acks_window + loop;
 190                smp_read_barrier_depends();
 191                if (*p_txb & 1)
 192                        continue;
 193
 194                txb = (struct sk_buff *) *p_txb;
 195                sp = rxrpc_skb(txb);
 196
 197                if (sp->need_resend) {
 198                        sp->need_resend = 0;
 199
 200                        /* each Tx packet has a new serial number */
 201                        sp->hdr.serial =
 202                                htonl(atomic_inc_return(&call->conn->serial));
 203
 204                        hdr = (struct rxrpc_header *) txb->head;
 205                        hdr->serial = sp->hdr.serial;
 206
 207                        _proto("Tx DATA %%%u { #%d }",
 208                               ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 209                        if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
 210                                stop = 0;
 211                                sp->resend_at = jiffies + 3;
 212                        } else {
 213                                sp->resend_at =
 214                                        jiffies + rxrpc_resend_timeout * HZ;
 215                        }
 216                }
 217
 218                if (time_after_eq(jiffies + 1, sp->resend_at)) {
 219                        sp->need_resend = 1;
 220                        resend |= 1;
 221                } else if (resend & 2) {
 222                        if (time_before(sp->resend_at, resend_at))
 223                                resend_at = sp->resend_at;
 224                } else {
 225                        resend_at = sp->resend_at;
 226                        resend |= 2;
 227                }
 228        }
 229
 230        rxrpc_set_resend(call, resend, resend_at);
 231        _leave("");
 232}
 233
 234/*
 235 * handle resend timer expiry
 236 */
 237static void rxrpc_resend_timer(struct rxrpc_call *call)
 238{
 239        struct rxrpc_skb_priv *sp;
 240        struct sk_buff *txb;
 241        unsigned long *p_txb, resend_at;
 242        int loop;
 243        u8 resend;
 244
 245        _enter("%d,%d,%d",
 246               call->acks_tail, call->acks_unacked, call->acks_head);
 247
 248        resend = 0;
 249        resend_at = 0;
 250
 251        for (loop = call->acks_unacked;
 252             loop != call->acks_head;
 253             loop = (loop + 1) &  (call->acks_winsz - 1)
 254             ) {
 255                p_txb = call->acks_window + loop;
 256                smp_read_barrier_depends();
 257                txb = (struct sk_buff *) (*p_txb & ~1);
 258                sp = rxrpc_skb(txb);
 259
 260                ASSERT(!(*p_txb & 1));
 261
 262                if (sp->need_resend) {
 263                        ;
 264                } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 265                        sp->need_resend = 1;
 266                        resend |= 1;
 267                } else if (resend & 2) {
 268                        if (time_before(sp->resend_at, resend_at))
 269                                resend_at = sp->resend_at;
 270                } else {
 271                        resend_at = sp->resend_at;
 272                        resend |= 2;
 273                }
 274        }
 275
 276        rxrpc_set_resend(call, resend, resend_at);
 277        _leave("");
 278}
 279
 280/*
 281 * process soft ACKs of our transmitted packets
 282 * - these indicate packets the peer has or has not received, but hasn't yet
 283 *   given to the consumer, and so can still be discarded and re-requested
 284 */
 285static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
 286                                   struct rxrpc_ackpacket *ack,
 287                                   struct sk_buff *skb)
 288{
 289        struct rxrpc_skb_priv *sp;
 290        struct sk_buff *txb;
 291        unsigned long *p_txb, resend_at;
 292        int loop;
 293        u8 sacks[RXRPC_MAXACKS], resend;
 294
 295        _enter("{%d,%d},{%d},",
 296               call->acks_hard,
 297               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
 298               ack->nAcks);
 299
 300        if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
 301                goto protocol_error;
 302
 303        resend = 0;
 304        resend_at = 0;
 305        for (loop = 0; loop < ack->nAcks; loop++) {
 306                p_txb = call->acks_window;
 307                p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
 308                smp_read_barrier_depends();
 309                txb = (struct sk_buff *) (*p_txb & ~1);
 310                sp = rxrpc_skb(txb);
 311
 312                switch (sacks[loop]) {
 313                case RXRPC_ACK_TYPE_ACK:
 314                        sp->need_resend = 0;
 315                        *p_txb |= 1;
 316                        break;
 317                case RXRPC_ACK_TYPE_NACK:
 318                        sp->need_resend = 1;
 319                        *p_txb &= ~1;
 320                        resend = 1;
 321                        break;
 322                default:
 323                        _debug("Unsupported ACK type %d", sacks[loop]);
 324                        goto protocol_error;
 325                }
 326        }
 327
 328        smp_mb();
 329        call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
 330
 331        /* anything not explicitly ACK'd is implicitly NACK'd, but may just not
 332         * have been received or processed yet by the far end */
 333        for (loop = call->acks_unacked;
 334             loop != call->acks_head;
 335             loop = (loop + 1) &  (call->acks_winsz - 1)
 336             ) {
 337                p_txb = call->acks_window + loop;
 338                smp_read_barrier_depends();
 339                txb = (struct sk_buff *) (*p_txb & ~1);
 340                sp = rxrpc_skb(txb);
 341
 342                if (*p_txb & 1) {
 343                        /* packet must have been discarded */
 344                        sp->need_resend = 1;
 345                        *p_txb &= ~1;
 346                        resend |= 1;
 347                } else if (sp->need_resend) {
 348                        ;
 349                } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 350                        sp->need_resend = 1;
 351                        resend |= 1;
 352                } else if (resend & 2) {
 353                        if (time_before(sp->resend_at, resend_at))
 354                                resend_at = sp->resend_at;
 355                } else {
 356                        resend_at = sp->resend_at;
 357                        resend |= 2;
 358                }
 359        }
 360
 361        rxrpc_set_resend(call, resend, resend_at);
 362        _leave(" = 0");
 363        return 0;
 364
 365protocol_error:
 366        _leave(" = -EPROTO");
 367        return -EPROTO;
 368}
 369
 370/*
 371 * discard hard-ACK'd packets from the Tx window
 372 */
 373static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
 374{
 375        struct rxrpc_skb_priv *sp;
 376        unsigned long _skb;
 377        int tail = call->acks_tail, old_tail;
 378        int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
 379
 380        _enter("{%u,%u},%u", call->acks_hard, win, hard);
 381
 382        ASSERTCMP(hard - call->acks_hard, <=, win);
 383
 384        while (call->acks_hard < hard) {
 385                smp_read_barrier_depends();
 386                _skb = call->acks_window[tail] & ~1;
 387                sp = rxrpc_skb((struct sk_buff *) _skb);
 388                rxrpc_free_skb((struct sk_buff *) _skb);
 389                old_tail = tail;
 390                tail = (tail + 1) & (call->acks_winsz - 1);
 391                call->acks_tail = tail;
 392                if (call->acks_unacked == old_tail)
 393                        call->acks_unacked = tail;
 394                call->acks_hard++;
 395        }
 396
 397        wake_up(&call->tx_waitq);
 398}
 399
 400/*
 401 * clear the Tx window in the event of a failure
 402 */
 403static void rxrpc_clear_tx_window(struct rxrpc_call *call)
 404{
 405        rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
 406}
 407
 408/*
 409 * drain the out of sequence received packet queue into the packet Rx queue
 410 */
 411static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
 412{
 413        struct rxrpc_skb_priv *sp;
 414        struct sk_buff *skb;
 415        bool terminal;
 416        int ret;
 417
 418        _enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
 419
 420        spin_lock_bh(&call->lock);
 421
 422        ret = -ECONNRESET;
 423        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 424                goto socket_unavailable;
 425
 426        skb = skb_dequeue(&call->rx_oos_queue);
 427        if (skb) {
 428                sp = rxrpc_skb(skb);
 429
 430                _debug("drain OOS packet %d [%d]",
 431                       ntohl(sp->hdr.seq), call->rx_first_oos);
 432
 433                if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
 434                        skb_queue_head(&call->rx_oos_queue, skb);
 435                        call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
 436                        _debug("requeue %p {%u}", skb, call->rx_first_oos);
 437                } else {
 438                        skb->mark = RXRPC_SKB_MARK_DATA;
 439                        terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
 440                                !(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
 441                        ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
 442                        BUG_ON(ret < 0);
 443                        _debug("drain #%u", call->rx_data_post);
 444                        call->rx_data_post++;
 445
 446                        /* find out what the next packet is */
 447                        skb = skb_peek(&call->rx_oos_queue);
 448                        if (skb)
 449                                call->rx_first_oos =
 450                                        ntohl(rxrpc_skb(skb)->hdr.seq);
 451                        else
 452                                call->rx_first_oos = 0;
 453                        _debug("peek %p {%u}", skb, call->rx_first_oos);
 454                }
 455        }
 456
 457        ret = 0;
 458socket_unavailable:
 459        spin_unlock_bh(&call->lock);
 460        _leave(" = %d", ret);
 461        return ret;
 462}
 463
 464/*
 465 * insert an out of sequence packet into the buffer
 466 */
 467static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
 468                                    struct sk_buff *skb)
 469{
 470        struct rxrpc_skb_priv *sp, *psp;
 471        struct sk_buff *p;
 472        u32 seq;
 473
 474        sp = rxrpc_skb(skb);
 475        seq = ntohl(sp->hdr.seq);
 476        _enter(",,{%u}", seq);
 477
 478        skb->destructor = rxrpc_packet_destructor;
 479        ASSERTCMP(sp->call, ==, NULL);
 480        sp->call = call;
 481        rxrpc_get_call(call);
 482
 483        /* insert into the buffer in sequence order */
 484        spin_lock_bh(&call->lock);
 485
 486        skb_queue_walk(&call->rx_oos_queue, p) {
 487                psp = rxrpc_skb(p);
 488                if (ntohl(psp->hdr.seq) > seq) {
 489                        _debug("insert oos #%u before #%u",
 490                               seq, ntohl(psp->hdr.seq));
 491                        skb_insert(p, skb, &call->rx_oos_queue);
 492                        goto inserted;
 493                }
 494        }
 495
 496        _debug("append oos #%u", seq);
 497        skb_queue_tail(&call->rx_oos_queue, skb);
 498inserted:
 499
 500        /* we might now have a new front to the queue */
 501        if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
 502                call->rx_first_oos = seq;
 503
 504        read_lock(&call->state_lock);
 505        if (call->state < RXRPC_CALL_COMPLETE &&
 506            call->rx_data_post == call->rx_first_oos) {
 507                _debug("drain rx oos now");
 508                set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
 509        }
 510        read_unlock(&call->state_lock);
 511
 512        spin_unlock_bh(&call->lock);
 513        _leave(" [stored #%u]", call->rx_first_oos);
 514}
 515
 516/*
 517 * clear the Tx window on final ACK reception
 518 */
 519static void rxrpc_zap_tx_window(struct rxrpc_call *call)
 520{
 521        struct rxrpc_skb_priv *sp;
 522        struct sk_buff *skb;
 523        unsigned long _skb, *acks_window;
 524        u8 winsz = call->acks_winsz;
 525        int tail;
 526
 527        acks_window = call->acks_window;
 528        call->acks_window = NULL;
 529
 530        while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
 531                tail = call->acks_tail;
 532                smp_read_barrier_depends();
 533                _skb = acks_window[tail] & ~1;
 534                smp_mb();
 535                call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
 536
 537                skb = (struct sk_buff *) _skb;
 538                sp = rxrpc_skb(skb);
 539                _debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
 540                rxrpc_free_skb(skb);
 541        }
 542
 543        kfree(acks_window);
 544}
 545
 546/*
 547 * process the extra information that may be appended to an ACK packet
 548 */
 549static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
 550                                  unsigned latest, int nAcks)
 551{
 552        struct rxrpc_ackinfo ackinfo;
 553        struct rxrpc_peer *peer;
 554        unsigned mtu;
 555
 556        if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
 557                _leave(" [no ackinfo]");
 558                return;
 559        }
 560
 561        _proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
 562               latest,
 563               ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
 564               ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
 565
 566        mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
 567
 568        peer = call->conn->trans->peer;
 569        if (mtu < peer->maxdata) {
 570                spin_lock_bh(&peer->lock);
 571                peer->maxdata = mtu;
 572                peer->mtu = mtu + peer->hdrsize;
 573                spin_unlock_bh(&peer->lock);
 574                _net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
 575        }
 576}
 577
 578/*
 579 * process packets in the reception queue
 580 */
 581static int rxrpc_process_rx_queue(struct rxrpc_call *call,
 582                                  u32 *_abort_code)
 583{
 584        struct rxrpc_ackpacket ack;
 585        struct rxrpc_skb_priv *sp;
 586        struct sk_buff *skb;
 587        bool post_ACK;
 588        int latest;
 589        u32 hard, tx;
 590
 591        _enter("");
 592
 593process_further:
 594        skb = skb_dequeue(&call->rx_queue);
 595        if (!skb)
 596                return -EAGAIN;
 597
 598        _net("deferred skb %p", skb);
 599
 600        sp = rxrpc_skb(skb);
 601
 602        _debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
 603
 604        post_ACK = false;
 605
 606        switch (sp->hdr.type) {
 607                /* data packets that wind up here have been received out of
 608                 * order, need security processing or are jumbo packets */
 609        case RXRPC_PACKET_TYPE_DATA:
 610                _proto("OOSQ DATA %%%u { #%u }",
 611                       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 612
 613                /* secured packets must be verified and possibly decrypted */
 614                if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
 615                        goto protocol_error;
 616
 617                rxrpc_insert_oos_packet(call, skb);
 618                goto process_further;
 619
 620                /* partial ACK to process */
 621        case RXRPC_PACKET_TYPE_ACK:
 622                if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
 623                        _debug("extraction failure");
 624                        goto protocol_error;
 625                }
 626                if (!skb_pull(skb, sizeof(ack)))
 627                        BUG();
 628
 629                latest = ntohl(sp->hdr.serial);
 630                hard = ntohl(ack.firstPacket);
 631                tx = atomic_read(&call->sequence);
 632
 633                _proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
 634                       latest,
 635                       ntohs(ack.maxSkew),
 636                       hard,
 637                       ntohl(ack.previousPacket),
 638                       ntohl(ack.serial),
 639                       rxrpc_acks[ack.reason],
 640                       ack.nAcks);
 641
 642                rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
 643
 644                if (ack.reason == RXRPC_ACK_PING) {
 645                        _proto("Rx ACK %%%u PING Request", latest);
 646                        rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
 647                                          sp->hdr.serial, true);
 648                }
 649
 650                /* discard any out-of-order or duplicate ACKs */
 651                if (latest - call->acks_latest <= 0) {
 652                        _debug("discard ACK %d <= %d",
 653                               latest, call->acks_latest);
 654                        goto discard;
 655                }
 656                call->acks_latest = latest;
 657
 658                if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
 659                    call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
 660                    call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
 661                    call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
 662                        goto discard;
 663
 664                _debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
 665
 666                if (hard > 0) {
 667                        if (hard - 1 > tx) {
 668                                _debug("hard-ACK'd packet %d not transmitted"
 669                                       " (%d top)",
 670                                       hard - 1, tx);
 671                                goto protocol_error;
 672                        }
 673
 674                        if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
 675                             call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
 676                            hard > tx)
 677                                goto all_acked;
 678
 679                        smp_rmb();
 680                        rxrpc_rotate_tx_window(call, hard - 1);
 681                }
 682
 683                if (ack.nAcks > 0) {
 684                        if (hard - 1 + ack.nAcks > tx) {
 685                                _debug("soft-ACK'd packet %d+%d not"
 686                                       " transmitted (%d top)",
 687                                       hard - 1, ack.nAcks, tx);
 688                                goto protocol_error;
 689                        }
 690
 691                        if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
 692                                goto protocol_error;
 693                }
 694                goto discard;
 695
 696                /* complete ACK to process */
 697        case RXRPC_PACKET_TYPE_ACKALL:
 698                goto all_acked;
 699
 700                /* abort and busy are handled elsewhere */
 701        case RXRPC_PACKET_TYPE_BUSY:
 702        case RXRPC_PACKET_TYPE_ABORT:
 703                BUG();
 704
 705                /* connection level events - also handled elsewhere */
 706        case RXRPC_PACKET_TYPE_CHALLENGE:
 707        case RXRPC_PACKET_TYPE_RESPONSE:
 708        case RXRPC_PACKET_TYPE_DEBUG:
 709                BUG();
 710        }
 711
 712        /* if we've had a hard ACK that covers all the packets we've sent, then
 713         * that ends that phase of the operation */
 714all_acked:
 715        write_lock_bh(&call->state_lock);
 716        _debug("ack all %d", call->state);
 717
 718        switch (call->state) {
 719        case RXRPC_CALL_CLIENT_AWAIT_REPLY:
 720                call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
 721                break;
 722        case RXRPC_CALL_SERVER_AWAIT_ACK:
 723                _debug("srv complete");
 724                call->state = RXRPC_CALL_COMPLETE;
 725                post_ACK = true;
 726                break;
 727        case RXRPC_CALL_CLIENT_SEND_REQUEST:
 728        case RXRPC_CALL_SERVER_RECV_REQUEST:
 729                goto protocol_error_unlock; /* can't occur yet */
 730        default:
 731                write_unlock_bh(&call->state_lock);
 732                goto discard; /* assume packet left over from earlier phase */
 733        }
 734
 735        write_unlock_bh(&call->state_lock);
 736
 737        /* if all the packets we sent are hard-ACK'd, then we can discard
 738         * whatever we've got left */
 739        _debug("clear Tx %d",
 740               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 741
 742        del_timer_sync(&call->resend_timer);
 743        clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 744        clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 745
 746        if (call->acks_window)
 747                rxrpc_zap_tx_window(call);
 748
 749        if (post_ACK) {
 750                /* post the final ACK message for userspace to pick up */
 751                _debug("post ACK");
 752                skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
 753                sp->call = call;
 754                rxrpc_get_call(call);
 755                spin_lock_bh(&call->lock);
 756                if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
 757                        BUG();
 758                spin_unlock_bh(&call->lock);
 759                goto process_further;
 760        }
 761
 762discard:
 763        rxrpc_free_skb(skb);
 764        goto process_further;
 765
 766protocol_error_unlock:
 767        write_unlock_bh(&call->state_lock);
 768protocol_error:
 769        rxrpc_free_skb(skb);
 770        _leave(" = -EPROTO");
 771        return -EPROTO;
 772}
 773
 774/*
 775 * post a message to the socket Rx queue for recvmsg() to pick up
 776 */
 777static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
 778                              bool fatal)
 779{
 780        struct rxrpc_skb_priv *sp;
 781        struct sk_buff *skb;
 782        int ret;
 783
 784        _enter("{%d,%lx},%u,%u,%d",
 785               call->debug_id, call->flags, mark, error, fatal);
 786
 787        /* remove timers and things for fatal messages */
 788        if (fatal) {
 789                del_timer_sync(&call->resend_timer);
 790                del_timer_sync(&call->ack_timer);
 791                clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 792        }
 793
 794        if (mark != RXRPC_SKB_MARK_NEW_CALL &&
 795            !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 796                _leave("[no userid]");
 797                return 0;
 798        }
 799
 800        if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
 801                skb = alloc_skb(0, GFP_NOFS);
 802                if (!skb)
 803                        return -ENOMEM;
 804
 805                rxrpc_new_skb(skb);
 806
 807                skb->mark = mark;
 808
 809                sp = rxrpc_skb(skb);
 810                memset(sp, 0, sizeof(*sp));
 811                sp->error = error;
 812                sp->call = call;
 813                rxrpc_get_call(call);
 814
 815                spin_lock_bh(&call->lock);
 816                ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
 817                spin_unlock_bh(&call->lock);
 818                BUG_ON(ret < 0);
 819        }
 820
 821        return 0;
 822}
 823
 824/*
 825 * handle background processing of incoming call packets and ACK / abort
 826 * generation
 827 */
 828void rxrpc_process_call(struct work_struct *work)
 829{
 830        struct rxrpc_call *call =
 831                container_of(work, struct rxrpc_call, processor);
 832        struct rxrpc_ackpacket ack;
 833        struct rxrpc_ackinfo ackinfo;
 834        struct rxrpc_header hdr;
 835        struct msghdr msg;
 836        struct kvec iov[5];
 837        unsigned long bits;
 838        __be32 data, pad;
 839        size_t len;
 840        int genbit, loop, nbit, ioc, ret, mtu;
 841        u32 abort_code = RX_PROTOCOL_ERROR;
 842        u8 *acks = NULL;
 843
 844        //printk("\n--------------------\n");
 845        _enter("{%d,%s,%lx} [%lu]",
 846               call->debug_id, rxrpc_call_states[call->state], call->events,
 847               (jiffies - call->creation_jif) / (HZ / 10));
 848
 849        if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
 850                _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
 851                return;
 852        }
 853
 854        /* there's a good chance we're going to have to send a message, so set
 855         * one up in advance */
 856        msg.msg_name    = &call->conn->trans->peer->srx.transport.sin;
 857        msg.msg_namelen = sizeof(call->conn->trans->peer->srx.transport.sin);
 858        msg.msg_control = NULL;
 859        msg.msg_controllen = 0;
 860        msg.msg_flags   = 0;
 861
 862        hdr.epoch       = call->conn->epoch;
 863        hdr.cid         = call->cid;
 864        hdr.callNumber  = call->call_id;
 865        hdr.seq         = 0;
 866        hdr.type        = RXRPC_PACKET_TYPE_ACK;
 867        hdr.flags       = call->conn->out_clientflag;
 868        hdr.userStatus  = 0;
 869        hdr.securityIndex = call->conn->security_ix;
 870        hdr._rsvd       = 0;
 871        hdr.serviceId   = call->conn->service_id;
 872
 873        memset(iov, 0, sizeof(iov));
 874        iov[0].iov_base = &hdr;
 875        iov[0].iov_len  = sizeof(hdr);
 876
 877        /* deal with events of a final nature */
 878        if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
 879                rxrpc_release_call(call);
 880                clear_bit(RXRPC_CALL_RELEASE, &call->events);
 881        }
 882
 883        if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
 884                int error;
 885
 886                clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 887                clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 888                clear_bit(RXRPC_CALL_ABORT, &call->events);
 889
 890                error = call->conn->trans->peer->net_error;
 891                _debug("post net error %d", error);
 892
 893                if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
 894                                       error, true) < 0)
 895                        goto no_mem;
 896                clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
 897                goto kill_ACKs;
 898        }
 899
 900        if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
 901                ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 902
 903                clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 904                clear_bit(RXRPC_CALL_ABORT, &call->events);
 905
 906                _debug("post conn abort");
 907
 908                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 909                                       call->conn->error, true) < 0)
 910                        goto no_mem;
 911                clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 912                goto kill_ACKs;
 913        }
 914
 915        if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
 916                hdr.type = RXRPC_PACKET_TYPE_BUSY;
 917                genbit = RXRPC_CALL_REJECT_BUSY;
 918                goto send_message;
 919        }
 920
 921        if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
 922                ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 923
 924                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 925                                       ECONNABORTED, true) < 0)
 926                        goto no_mem;
 927                hdr.type = RXRPC_PACKET_TYPE_ABORT;
 928                data = htonl(call->abort_code);
 929                iov[1].iov_base = &data;
 930                iov[1].iov_len = sizeof(data);
 931                genbit = RXRPC_CALL_ABORT;
 932                goto send_message;
 933        }
 934
 935        if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
 936                genbit = RXRPC_CALL_ACK_FINAL;
 937
 938                ack.bufferSpace = htons(8);
 939                ack.maxSkew     = 0;
 940                ack.serial      = 0;
 941                ack.reason      = RXRPC_ACK_IDLE;
 942                ack.nAcks       = 0;
 943                call->ackr_reason = 0;
 944
 945                spin_lock_bh(&call->lock);
 946                ack.serial = call->ackr_serial;
 947                ack.previousPacket = call->ackr_prev_seq;
 948                ack.firstPacket = htonl(call->rx_data_eaten + 1);
 949                spin_unlock_bh(&call->lock);
 950
 951                pad = 0;
 952
 953                iov[1].iov_base = &ack;
 954                iov[1].iov_len  = sizeof(ack);
 955                iov[2].iov_base = &pad;
 956                iov[2].iov_len  = 3;
 957                iov[3].iov_base = &ackinfo;
 958                iov[3].iov_len  = sizeof(ackinfo);
 959                goto send_ACK;
 960        }
 961
 962        if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
 963                            (1 << RXRPC_CALL_RCVD_ABORT))
 964            ) {
 965                u32 mark;
 966
 967                if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
 968                        mark = RXRPC_SKB_MARK_REMOTE_ABORT;
 969                else
 970                        mark = RXRPC_SKB_MARK_BUSY;
 971
 972                _debug("post abort/busy");
 973                rxrpc_clear_tx_window(call);
 974                if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
 975                        goto no_mem;
 976
 977                clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
 978                clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
 979                goto kill_ACKs;
 980        }
 981
 982        if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
 983                _debug("do implicit ackall");
 984                rxrpc_clear_tx_window(call);
 985        }
 986
 987        if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
 988                write_lock_bh(&call->state_lock);
 989                if (call->state <= RXRPC_CALL_COMPLETE) {
 990                        call->state = RXRPC_CALL_LOCALLY_ABORTED;
 991                        call->abort_code = RX_CALL_TIMEOUT;
 992                        set_bit(RXRPC_CALL_ABORT, &call->events);
 993                }
 994                write_unlock_bh(&call->state_lock);
 995
 996                _debug("post timeout");
 997                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 998                                       ETIME, true) < 0)
 999                        goto no_mem;
1000
1001                clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
1002                goto kill_ACKs;
1003        }
1004
1005        /* deal with assorted inbound messages */
1006        if (!skb_queue_empty(&call->rx_queue)) {
1007                switch (rxrpc_process_rx_queue(call, &abort_code)) {
1008                case 0:
1009                case -EAGAIN:
1010                        break;
1011                case -ENOMEM:
1012                        goto no_mem;
1013                case -EKEYEXPIRED:
1014                case -EKEYREJECTED:
1015                case -EPROTO:
1016                        rxrpc_abort_call(call, abort_code);
1017                        goto kill_ACKs;
1018                }
1019        }
1020
1021        /* handle resending */
1022        if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
1023                rxrpc_resend_timer(call);
1024        if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
1025                rxrpc_resend(call);
1026
1027        /* consider sending an ordinary ACK */
1028        if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1029                _debug("send ACK: window: %d - %d { %lx }",
1030                       call->rx_data_eaten, call->ackr_win_top,
1031                       call->ackr_window[0]);
1032
1033                if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1034                    call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
1035                        /* ACK by sending reply DATA packet in this state */
1036                        clear_bit(RXRPC_CALL_ACK, &call->events);
1037                        goto maybe_reschedule;
1038                }
1039
1040                genbit = RXRPC_CALL_ACK;
1041
1042                acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1043                               GFP_NOFS);
1044                if (!acks)
1045                        goto no_mem;
1046
1047                //hdr.flags     = RXRPC_SLOW_START_OK;
1048                ack.bufferSpace = htons(8);
1049                ack.maxSkew     = 0;
1050                ack.serial      = 0;
1051                ack.reason      = 0;
1052
1053                spin_lock_bh(&call->lock);
1054                ack.reason = call->ackr_reason;
1055                ack.serial = call->ackr_serial;
1056                ack.previousPacket = call->ackr_prev_seq;
1057                ack.firstPacket = htonl(call->rx_data_eaten + 1);
1058
1059                ack.nAcks = 0;
1060                for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1061                        nbit = loop * BITS_PER_LONG;
1062                        for (bits = call->ackr_window[loop]; bits; bits >>= 1
1063                             ) {
1064                                _debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1065                                if (bits & 1) {
1066                                        acks[nbit] = RXRPC_ACK_TYPE_ACK;
1067                                        ack.nAcks = nbit + 1;
1068                                }
1069                                nbit++;
1070                        }
1071                }
1072                call->ackr_reason = 0;
1073                spin_unlock_bh(&call->lock);
1074
1075                pad = 0;
1076
1077                iov[1].iov_base = &ack;
1078                iov[1].iov_len  = sizeof(ack);
1079                iov[2].iov_base = acks;
1080                iov[2].iov_len  = ack.nAcks;
1081                iov[3].iov_base = &pad;
1082                iov[3].iov_len  = 3;
1083                iov[4].iov_base = &ackinfo;
1084                iov[4].iov_len  = sizeof(ackinfo);
1085
1086                switch (ack.reason) {
1087                case RXRPC_ACK_REQUESTED:
1088                case RXRPC_ACK_DUPLICATE:
1089                case RXRPC_ACK_OUT_OF_SEQUENCE:
1090                case RXRPC_ACK_EXCEEDS_WINDOW:
1091                case RXRPC_ACK_NOSPACE:
1092                case RXRPC_ACK_PING:
1093                case RXRPC_ACK_PING_RESPONSE:
1094                        goto send_ACK_with_skew;
1095                case RXRPC_ACK_DELAY:
1096                case RXRPC_ACK_IDLE:
1097                        goto send_ACK;
1098                }
1099        }
1100
1101        /* handle completion of security negotiations on an incoming
1102         * connection */
1103        if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
1104                _debug("secured");
1105                spin_lock_bh(&call->lock);
1106
1107                if (call->state == RXRPC_CALL_SERVER_SECURING) {
1108                        _debug("securing");
1109                        write_lock(&call->conn->lock);
1110                        if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1111                            !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1112                                _debug("not released");
1113                                call->state = RXRPC_CALL_SERVER_ACCEPTING;
1114                                list_move_tail(&call->accept_link,
1115                                               &call->socket->acceptq);
1116                        }
1117                        write_unlock(&call->conn->lock);
1118                        read_lock(&call->state_lock);
1119                        if (call->state < RXRPC_CALL_COMPLETE)
1120                                set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1121                        read_unlock(&call->state_lock);
1122                }
1123
1124                spin_unlock_bh(&call->lock);
1125                if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1126                        goto maybe_reschedule;
1127        }
1128
1129        /* post a notification of an acceptable connection to the app */
1130        if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1131                _debug("post accept");
1132                if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1133                                       0, false) < 0)
1134                        goto no_mem;
1135                clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1136                goto maybe_reschedule;
1137        }
1138
1139        /* handle incoming call acceptance */
1140        if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
1141                _debug("accepted");
1142                ASSERTCMP(call->rx_data_post, ==, 0);
1143                call->rx_data_post = 1;
1144                read_lock_bh(&call->state_lock);
1145                if (call->state < RXRPC_CALL_COMPLETE)
1146                        set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
1147                read_unlock_bh(&call->state_lock);
1148        }
1149
1150        /* drain the out of sequence received packet queue into the packet Rx
1151         * queue */
1152        if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
1153                while (call->rx_data_post == call->rx_first_oos)
1154                        if (rxrpc_drain_rx_oos_queue(call) < 0)
1155                                break;
1156                goto maybe_reschedule;
1157        }
1158
1159        /* other events may have been raised since we started checking */
1160        goto maybe_reschedule;
1161
1162send_ACK_with_skew:
1163        ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1164                            ntohl(ack.serial));
1165send_ACK:
1166        mtu = call->conn->trans->peer->if_mtu;
1167        mtu -= call->conn->trans->peer->hdrsize;
1168        ackinfo.maxMTU  = htonl(mtu);
1169        ackinfo.rwind   = htonl(32);
1170
1171        /* permit the peer to send us jumbo packets if it wants to */
1172        ackinfo.rxMTU   = htonl(5692);
1173        ackinfo.jumbo_max = htonl(4);
1174
1175        hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1176        _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1177               ntohl(hdr.serial),
1178               ntohs(ack.maxSkew),
1179               ntohl(ack.firstPacket),
1180               ntohl(ack.previousPacket),
1181               ntohl(ack.serial),
1182               rxrpc_acks[ack.reason],
1183               ack.nAcks);
1184
1185        del_timer_sync(&call->ack_timer);
1186        if (ack.nAcks > 0)
1187                set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
1188        goto send_message_2;
1189
1190send_message:
1191        _debug("send message");
1192
1193        hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1194        _proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1195send_message_2:
1196
1197        len = iov[0].iov_len;
1198        ioc = 1;
1199        if (iov[4].iov_len) {
1200                ioc = 5;
1201                len += iov[4].iov_len;
1202                len += iov[3].iov_len;
1203                len += iov[2].iov_len;
1204                len += iov[1].iov_len;
1205        } else if (iov[3].iov_len) {
1206                ioc = 4;
1207                len += iov[3].iov_len;
1208                len += iov[2].iov_len;
1209                len += iov[1].iov_len;
1210        } else if (iov[2].iov_len) {
1211                ioc = 3;
1212                len += iov[2].iov_len;
1213                len += iov[1].iov_len;
1214        } else if (iov[1].iov_len) {
1215                ioc = 2;
1216                len += iov[1].iov_len;
1217        }
1218
1219        ret = kernel_sendmsg(call->conn->trans->local->socket,
1220                             &msg, iov, ioc, len);
1221        if (ret < 0) {
1222                _debug("sendmsg failed: %d", ret);
1223                read_lock_bh(&call->state_lock);
1224                if (call->state < RXRPC_CALL_DEAD)
1225                        rxrpc_queue_call(call);
1226                read_unlock_bh(&call->state_lock);
1227                goto error;
1228        }
1229
1230        switch (genbit) {
1231        case RXRPC_CALL_ABORT:
1232                clear_bit(genbit, &call->events);
1233                clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1234                goto kill_ACKs;
1235
1236        case RXRPC_CALL_ACK_FINAL:
1237                write_lock_bh(&call->state_lock);
1238                if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1239                        call->state = RXRPC_CALL_COMPLETE;
1240                write_unlock_bh(&call->state_lock);
1241                goto kill_ACKs;
1242
1243        default:
1244                clear_bit(genbit, &call->events);
1245                switch (call->state) {
1246                case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1247                case RXRPC_CALL_CLIENT_RECV_REPLY:
1248                case RXRPC_CALL_SERVER_RECV_REQUEST:
1249                case RXRPC_CALL_SERVER_ACK_REQUEST:
1250                        _debug("start ACK timer");
1251                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
1252                                          call->ackr_serial, false);
1253                default:
1254                        break;
1255                }
1256                goto maybe_reschedule;
1257        }
1258
1259kill_ACKs:
1260        del_timer_sync(&call->ack_timer);
1261        if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
1262                rxrpc_put_call(call);
1263        clear_bit(RXRPC_CALL_ACK, &call->events);
1264
1265maybe_reschedule:
1266        if (call->events || !skb_queue_empty(&call->rx_queue)) {
1267                read_lock_bh(&call->state_lock);
1268                if (call->state < RXRPC_CALL_DEAD)
1269                        rxrpc_queue_call(call);
1270                read_unlock_bh(&call->state_lock);
1271        }
1272
1273        /* don't leave aborted connections on the accept queue */
1274        if (call->state >= RXRPC_CALL_COMPLETE &&
1275            !list_empty(&call->accept_link)) {
1276                _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1277                       call, call->events, call->flags,
1278                       ntohl(call->conn->cid));
1279
1280                read_lock_bh(&call->state_lock);
1281                if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1282                    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1283                        rxrpc_queue_call(call);
1284                read_unlock_bh(&call->state_lock);
1285        }
1286
1287error:
1288        clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
1289        kfree(acks);
1290
1291        /* because we don't want two CPUs both processing the work item for one
1292         * call at the same time, we use a flag to note when it's busy; however
1293         * this means there's a race between clearing the flag and setting the
1294         * work pending bit and the work item being processed again */
1295        if (call->events && !work_pending(&call->processor)) {
1296                _debug("jumpstart %x", ntohl(call->conn->cid));
1297                rxrpc_queue_call(call);
1298        }
1299
1300        _leave("");
1301        return;
1302
1303no_mem:
1304        _debug("out of memory");
1305        goto maybe_reschedule;
1306}
1307