linux/net/rxrpc/ar-ack.c
<<
>>
Prefs
   1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/module.h>
  13#include <linux/circ_buf.h>
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/slab.h>
  17#include <linux/udp.h>
  18#include <net/sock.h>
  19#include <net/af_rxrpc.h>
  20#include "ar-internal.h"
  21
  22static unsigned int rxrpc_ack_defer = 1;
  23
  24static const char *const rxrpc_acks[] = {
  25        "---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
  26        "-?-"
  27};
  28
  29static const s8 rxrpc_ack_priority[] = {
  30        [0]                             = 0,
  31        [RXRPC_ACK_DELAY]               = 1,
  32        [RXRPC_ACK_REQUESTED]           = 2,
  33        [RXRPC_ACK_IDLE]                = 3,
  34        [RXRPC_ACK_PING_RESPONSE]       = 4,
  35        [RXRPC_ACK_DUPLICATE]           = 5,
  36        [RXRPC_ACK_OUT_OF_SEQUENCE]     = 6,
  37        [RXRPC_ACK_EXCEEDS_WINDOW]      = 7,
  38        [RXRPC_ACK_NOSPACE]             = 8,
  39};
  40
  41/*
  42 * propose an ACK be sent
  43 */
  44void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
  45                         __be32 serial, bool immediate)
  46{
  47        unsigned long expiry;
  48        s8 prior = rxrpc_ack_priority[ack_reason];
  49
  50        ASSERTCMP(prior, >, 0);
  51
  52        _enter("{%d},%s,%%%x,%u",
  53               call->debug_id, rxrpc_acks[ack_reason], ntohl(serial),
  54               immediate);
  55
  56        if (prior < rxrpc_ack_priority[call->ackr_reason]) {
  57                if (immediate)
  58                        goto cancel_timer;
  59                return;
  60        }
  61
  62        /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
  63         * numbers */
  64        if (prior == rxrpc_ack_priority[call->ackr_reason]) {
  65                if (prior <= 4)
  66                        call->ackr_serial = serial;
  67                if (immediate)
  68                        goto cancel_timer;
  69                return;
  70        }
  71
  72        call->ackr_reason = ack_reason;
  73        call->ackr_serial = serial;
  74
  75        switch (ack_reason) {
  76        case RXRPC_ACK_DELAY:
  77                _debug("run delay timer");
  78                call->ack_timer.expires = jiffies + rxrpc_ack_timeout * HZ;
  79                add_timer(&call->ack_timer);
  80                return;
  81
  82        case RXRPC_ACK_IDLE:
  83                if (!immediate) {
  84                        _debug("run defer timer");
  85                        expiry = 1;
  86                        goto run_timer;
  87                }
  88                goto cancel_timer;
  89
  90        case RXRPC_ACK_REQUESTED:
  91                if (!rxrpc_ack_defer)
  92                        goto cancel_timer;
  93                if (!immediate || serial == cpu_to_be32(1)) {
  94                        _debug("run defer timer");
  95                        expiry = rxrpc_ack_defer;
  96                        goto run_timer;
  97                }
  98
  99        default:
 100                _debug("immediate ACK");
 101                goto cancel_timer;
 102        }
 103
 104run_timer:
 105        expiry += jiffies;
 106        if (!timer_pending(&call->ack_timer) ||
 107            time_after(call->ack_timer.expires, expiry))
 108                mod_timer(&call->ack_timer, expiry);
 109        return;
 110
 111cancel_timer:
 112        _debug("cancel timer %%%u", ntohl(serial));
 113        try_to_del_timer_sync(&call->ack_timer);
 114        read_lock_bh(&call->state_lock);
 115        if (call->state <= RXRPC_CALL_COMPLETE &&
 116            !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
 117                rxrpc_queue_call(call);
 118        read_unlock_bh(&call->state_lock);
 119}
 120
 121/*
 122 * propose an ACK be sent, locking the call structure
 123 */
 124void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
 125                       __be32 serial, bool immediate)
 126{
 127        s8 prior = rxrpc_ack_priority[ack_reason];
 128
 129        if (prior > rxrpc_ack_priority[call->ackr_reason]) {
 130                spin_lock_bh(&call->lock);
 131                __rxrpc_propose_ACK(call, ack_reason, serial, immediate);
 132                spin_unlock_bh(&call->lock);
 133        }
 134}
 135
 136/*
 137 * set the resend timer
 138 */
 139static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
 140                             unsigned long resend_at)
 141{
 142        read_lock_bh(&call->state_lock);
 143        if (call->state >= RXRPC_CALL_COMPLETE)
 144                resend = 0;
 145
 146        if (resend & 1) {
 147                _debug("SET RESEND");
 148                set_bit(RXRPC_CALL_RESEND, &call->events);
 149        }
 150
 151        if (resend & 2) {
 152                _debug("MODIFY RESEND TIMER");
 153                set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 154                mod_timer(&call->resend_timer, resend_at);
 155        } else {
 156                _debug("KILL RESEND TIMER");
 157                del_timer_sync(&call->resend_timer);
 158                clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 159                clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 160        }
 161        read_unlock_bh(&call->state_lock);
 162}
 163
 164/*
 165 * resend packets
 166 */
 167static void rxrpc_resend(struct rxrpc_call *call)
 168{
 169        struct rxrpc_skb_priv *sp;
 170        struct rxrpc_header *hdr;
 171        struct sk_buff *txb;
 172        unsigned long *p_txb, resend_at;
 173        int loop, stop;
 174        u8 resend;
 175
 176        _enter("{%d,%d,%d,%d},",
 177               call->acks_hard, call->acks_unacked,
 178               atomic_read(&call->sequence),
 179               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 180
 181        stop = 0;
 182        resend = 0;
 183        resend_at = 0;
 184
 185        for (loop = call->acks_tail;
 186             loop != call->acks_head || stop;
 187             loop = (loop + 1) &  (call->acks_winsz - 1)
 188             ) {
 189                p_txb = call->acks_window + loop;
 190                smp_read_barrier_depends();
 191                if (*p_txb & 1)
 192                        continue;
 193
 194                txb = (struct sk_buff *) *p_txb;
 195                sp = rxrpc_skb(txb);
 196
 197                if (sp->need_resend) {
 198                        sp->need_resend = false;
 199
 200                        /* each Tx packet has a new serial number */
 201                        sp->hdr.serial =
 202                                htonl(atomic_inc_return(&call->conn->serial));
 203
 204                        hdr = (struct rxrpc_header *) txb->head;
 205                        hdr->serial = sp->hdr.serial;
 206
 207                        _proto("Tx DATA %%%u { #%d }",
 208                               ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 209                        if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
 210                                stop = 0;
 211                                sp->resend_at = jiffies + 3;
 212                        } else {
 213                                sp->resend_at =
 214                                        jiffies + rxrpc_resend_timeout * HZ;
 215                        }
 216                }
 217
 218                if (time_after_eq(jiffies + 1, sp->resend_at)) {
 219                        sp->need_resend = true;
 220                        resend |= 1;
 221                } else if (resend & 2) {
 222                        if (time_before(sp->resend_at, resend_at))
 223                                resend_at = sp->resend_at;
 224                } else {
 225                        resend_at = sp->resend_at;
 226                        resend |= 2;
 227                }
 228        }
 229
 230        rxrpc_set_resend(call, resend, resend_at);
 231        _leave("");
 232}
 233
 234/*
 235 * handle resend timer expiry
 236 */
 237static void rxrpc_resend_timer(struct rxrpc_call *call)
 238{
 239        struct rxrpc_skb_priv *sp;
 240        struct sk_buff *txb;
 241        unsigned long *p_txb, resend_at;
 242        int loop;
 243        u8 resend;
 244
 245        _enter("%d,%d,%d",
 246               call->acks_tail, call->acks_unacked, call->acks_head);
 247
 248        if (call->state >= RXRPC_CALL_COMPLETE)
 249                return;
 250
 251        resend = 0;
 252        resend_at = 0;
 253
 254        for (loop = call->acks_unacked;
 255             loop != call->acks_head;
 256             loop = (loop + 1) &  (call->acks_winsz - 1)
 257             ) {
 258                p_txb = call->acks_window + loop;
 259                smp_read_barrier_depends();
 260                txb = (struct sk_buff *) (*p_txb & ~1);
 261                sp = rxrpc_skb(txb);
 262
 263                ASSERT(!(*p_txb & 1));
 264
 265                if (sp->need_resend) {
 266                        ;
 267                } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 268                        sp->need_resend = true;
 269                        resend |= 1;
 270                } else if (resend & 2) {
 271                        if (time_before(sp->resend_at, resend_at))
 272                                resend_at = sp->resend_at;
 273                } else {
 274                        resend_at = sp->resend_at;
 275                        resend |= 2;
 276                }
 277        }
 278
 279        rxrpc_set_resend(call, resend, resend_at);
 280        _leave("");
 281}
 282
 283/*
 284 * process soft ACKs of our transmitted packets
 285 * - these indicate packets the peer has or has not received, but hasn't yet
 286 *   given to the consumer, and so can still be discarded and re-requested
 287 */
 288static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
 289                                   struct rxrpc_ackpacket *ack,
 290                                   struct sk_buff *skb)
 291{
 292        struct rxrpc_skb_priv *sp;
 293        struct sk_buff *txb;
 294        unsigned long *p_txb, resend_at;
 295        int loop;
 296        u8 sacks[RXRPC_MAXACKS], resend;
 297
 298        _enter("{%d,%d},{%d},",
 299               call->acks_hard,
 300               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
 301               ack->nAcks);
 302
 303        if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
 304                goto protocol_error;
 305
 306        resend = 0;
 307        resend_at = 0;
 308        for (loop = 0; loop < ack->nAcks; loop++) {
 309                p_txb = call->acks_window;
 310                p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
 311                smp_read_barrier_depends();
 312                txb = (struct sk_buff *) (*p_txb & ~1);
 313                sp = rxrpc_skb(txb);
 314
 315                switch (sacks[loop]) {
 316                case RXRPC_ACK_TYPE_ACK:
 317                        sp->need_resend = false;
 318                        *p_txb |= 1;
 319                        break;
 320                case RXRPC_ACK_TYPE_NACK:
 321                        sp->need_resend = true;
 322                        *p_txb &= ~1;
 323                        resend = 1;
 324                        break;
 325                default:
 326                        _debug("Unsupported ACK type %d", sacks[loop]);
 327                        goto protocol_error;
 328                }
 329        }
 330
 331        smp_mb();
 332        call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
 333
 334        /* anything not explicitly ACK'd is implicitly NACK'd, but may just not
 335         * have been received or processed yet by the far end */
 336        for (loop = call->acks_unacked;
 337             loop != call->acks_head;
 338             loop = (loop + 1) &  (call->acks_winsz - 1)
 339             ) {
 340                p_txb = call->acks_window + loop;
 341                smp_read_barrier_depends();
 342                txb = (struct sk_buff *) (*p_txb & ~1);
 343                sp = rxrpc_skb(txb);
 344
 345                if (*p_txb & 1) {
 346                        /* packet must have been discarded */
 347                        sp->need_resend = true;
 348                        *p_txb &= ~1;
 349                        resend |= 1;
 350                } else if (sp->need_resend) {
 351                        ;
 352                } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 353                        sp->need_resend = true;
 354                        resend |= 1;
 355                } else if (resend & 2) {
 356                        if (time_before(sp->resend_at, resend_at))
 357                                resend_at = sp->resend_at;
 358                } else {
 359                        resend_at = sp->resend_at;
 360                        resend |= 2;
 361                }
 362        }
 363
 364        rxrpc_set_resend(call, resend, resend_at);
 365        _leave(" = 0");
 366        return 0;
 367
 368protocol_error:
 369        _leave(" = -EPROTO");
 370        return -EPROTO;
 371}
 372
 373/*
 374 * discard hard-ACK'd packets from the Tx window
 375 */
 376static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
 377{
 378        unsigned long _skb;
 379        int tail = call->acks_tail, old_tail;
 380        int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
 381
 382        _enter("{%u,%u},%u", call->acks_hard, win, hard);
 383
 384        ASSERTCMP(hard - call->acks_hard, <=, win);
 385
 386        while (call->acks_hard < hard) {
 387                smp_read_barrier_depends();
 388                _skb = call->acks_window[tail] & ~1;
 389                rxrpc_free_skb((struct sk_buff *) _skb);
 390                old_tail = tail;
 391                tail = (tail + 1) & (call->acks_winsz - 1);
 392                call->acks_tail = tail;
 393                if (call->acks_unacked == old_tail)
 394                        call->acks_unacked = tail;
 395                call->acks_hard++;
 396        }
 397
 398        wake_up(&call->tx_waitq);
 399}
 400
 401/*
 402 * clear the Tx window in the event of a failure
 403 */
 404static void rxrpc_clear_tx_window(struct rxrpc_call *call)
 405{
 406        rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
 407}
 408
 409/*
 410 * drain the out of sequence received packet queue into the packet Rx queue
 411 */
 412static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
 413{
 414        struct rxrpc_skb_priv *sp;
 415        struct sk_buff *skb;
 416        bool terminal;
 417        int ret;
 418
 419        _enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
 420
 421        spin_lock_bh(&call->lock);
 422
 423        ret = -ECONNRESET;
 424        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 425                goto socket_unavailable;
 426
 427        skb = skb_dequeue(&call->rx_oos_queue);
 428        if (skb) {
 429                sp = rxrpc_skb(skb);
 430
 431                _debug("drain OOS packet %d [%d]",
 432                       ntohl(sp->hdr.seq), call->rx_first_oos);
 433
 434                if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
 435                        skb_queue_head(&call->rx_oos_queue, skb);
 436                        call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
 437                        _debug("requeue %p {%u}", skb, call->rx_first_oos);
 438                } else {
 439                        skb->mark = RXRPC_SKB_MARK_DATA;
 440                        terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
 441                                !(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
 442                        ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
 443                        BUG_ON(ret < 0);
 444                        _debug("drain #%u", call->rx_data_post);
 445                        call->rx_data_post++;
 446
 447                        /* find out what the next packet is */
 448                        skb = skb_peek(&call->rx_oos_queue);
 449                        if (skb)
 450                                call->rx_first_oos =
 451                                        ntohl(rxrpc_skb(skb)->hdr.seq);
 452                        else
 453                                call->rx_first_oos = 0;
 454                        _debug("peek %p {%u}", skb, call->rx_first_oos);
 455                }
 456        }
 457
 458        ret = 0;
 459socket_unavailable:
 460        spin_unlock_bh(&call->lock);
 461        _leave(" = %d", ret);
 462        return ret;
 463}
 464
 465/*
 466 * insert an out of sequence packet into the buffer
 467 */
 468static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
 469                                    struct sk_buff *skb)
 470{
 471        struct rxrpc_skb_priv *sp, *psp;
 472        struct sk_buff *p;
 473        u32 seq;
 474
 475        sp = rxrpc_skb(skb);
 476        seq = ntohl(sp->hdr.seq);
 477        _enter(",,{%u}", seq);
 478
 479        skb->destructor = rxrpc_packet_destructor;
 480        ASSERTCMP(sp->call, ==, NULL);
 481        sp->call = call;
 482        rxrpc_get_call(call);
 483
 484        /* insert into the buffer in sequence order */
 485        spin_lock_bh(&call->lock);
 486
 487        skb_queue_walk(&call->rx_oos_queue, p) {
 488                psp = rxrpc_skb(p);
 489                if (ntohl(psp->hdr.seq) > seq) {
 490                        _debug("insert oos #%u before #%u",
 491                               seq, ntohl(psp->hdr.seq));
 492                        skb_insert(p, skb, &call->rx_oos_queue);
 493                        goto inserted;
 494                }
 495        }
 496
 497        _debug("append oos #%u", seq);
 498        skb_queue_tail(&call->rx_oos_queue, skb);
 499inserted:
 500
 501        /* we might now have a new front to the queue */
 502        if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
 503                call->rx_first_oos = seq;
 504
 505        read_lock(&call->state_lock);
 506        if (call->state < RXRPC_CALL_COMPLETE &&
 507            call->rx_data_post == call->rx_first_oos) {
 508                _debug("drain rx oos now");
 509                set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
 510        }
 511        read_unlock(&call->state_lock);
 512
 513        spin_unlock_bh(&call->lock);
 514        _leave(" [stored #%u]", call->rx_first_oos);
 515}
 516
 517/*
 518 * clear the Tx window on final ACK reception
 519 */
 520static void rxrpc_zap_tx_window(struct rxrpc_call *call)
 521{
 522        struct rxrpc_skb_priv *sp;
 523        struct sk_buff *skb;
 524        unsigned long _skb, *acks_window;
 525        u8 winsz = call->acks_winsz;
 526        int tail;
 527
 528        acks_window = call->acks_window;
 529        call->acks_window = NULL;
 530
 531        while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
 532                tail = call->acks_tail;
 533                smp_read_barrier_depends();
 534                _skb = acks_window[tail] & ~1;
 535                smp_mb();
 536                call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
 537
 538                skb = (struct sk_buff *) _skb;
 539                sp = rxrpc_skb(skb);
 540                _debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
 541                rxrpc_free_skb(skb);
 542        }
 543
 544        kfree(acks_window);
 545}
 546
 547/*
 548 * process the extra information that may be appended to an ACK packet
 549 */
 550static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
 551                                  unsigned int latest, int nAcks)
 552{
 553        struct rxrpc_ackinfo ackinfo;
 554        struct rxrpc_peer *peer;
 555        unsigned int mtu;
 556
 557        if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
 558                _leave(" [no ackinfo]");
 559                return;
 560        }
 561
 562        _proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
 563               latest,
 564               ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
 565               ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
 566
 567        mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
 568
 569        peer = call->conn->trans->peer;
 570        if (mtu < peer->maxdata) {
 571                spin_lock_bh(&peer->lock);
 572                peer->maxdata = mtu;
 573                peer->mtu = mtu + peer->hdrsize;
 574                spin_unlock_bh(&peer->lock);
 575                _net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
 576        }
 577}
 578
 579/*
 580 * process packets in the reception queue
 581 */
 582static int rxrpc_process_rx_queue(struct rxrpc_call *call,
 583                                  u32 *_abort_code)
 584{
 585        struct rxrpc_ackpacket ack;
 586        struct rxrpc_skb_priv *sp;
 587        struct sk_buff *skb;
 588        bool post_ACK;
 589        int latest;
 590        u32 hard, tx;
 591
 592        _enter("");
 593
 594process_further:
 595        skb = skb_dequeue(&call->rx_queue);
 596        if (!skb)
 597                return -EAGAIN;
 598
 599        _net("deferred skb %p", skb);
 600
 601        sp = rxrpc_skb(skb);
 602
 603        _debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
 604
 605        post_ACK = false;
 606
 607        switch (sp->hdr.type) {
 608                /* data packets that wind up here have been received out of
 609                 * order, need security processing or are jumbo packets */
 610        case RXRPC_PACKET_TYPE_DATA:
 611                _proto("OOSQ DATA %%%u { #%u }",
 612                       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 613
 614                /* secured packets must be verified and possibly decrypted */
 615                if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
 616                        goto protocol_error;
 617
 618                rxrpc_insert_oos_packet(call, skb);
 619                goto process_further;
 620
 621                /* partial ACK to process */
 622        case RXRPC_PACKET_TYPE_ACK:
 623                if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
 624                        _debug("extraction failure");
 625                        goto protocol_error;
 626                }
 627                if (!skb_pull(skb, sizeof(ack)))
 628                        BUG();
 629
 630                latest = ntohl(sp->hdr.serial);
 631                hard = ntohl(ack.firstPacket);
 632                tx = atomic_read(&call->sequence);
 633
 634                _proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
 635                       latest,
 636                       ntohs(ack.maxSkew),
 637                       hard,
 638                       ntohl(ack.previousPacket),
 639                       ntohl(ack.serial),
 640                       rxrpc_acks[ack.reason],
 641                       ack.nAcks);
 642
 643                rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
 644
 645                if (ack.reason == RXRPC_ACK_PING) {
 646                        _proto("Rx ACK %%%u PING Request", latest);
 647                        rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
 648                                          sp->hdr.serial, true);
 649                }
 650
 651                /* discard any out-of-order or duplicate ACKs */
 652                if (latest - call->acks_latest <= 0) {
 653                        _debug("discard ACK %d <= %d",
 654                               latest, call->acks_latest);
 655                        goto discard;
 656                }
 657                call->acks_latest = latest;
 658
 659                if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
 660                    call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
 661                    call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
 662                    call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
 663                        goto discard;
 664
 665                _debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
 666
 667                if (hard > 0) {
 668                        if (hard - 1 > tx) {
 669                                _debug("hard-ACK'd packet %d not transmitted"
 670                                       " (%d top)",
 671                                       hard - 1, tx);
 672                                goto protocol_error;
 673                        }
 674
 675                        if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
 676                             call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
 677                            hard > tx)
 678                                goto all_acked;
 679
 680                        smp_rmb();
 681                        rxrpc_rotate_tx_window(call, hard - 1);
 682                }
 683
 684                if (ack.nAcks > 0) {
 685                        if (hard - 1 + ack.nAcks > tx) {
 686                                _debug("soft-ACK'd packet %d+%d not"
 687                                       " transmitted (%d top)",
 688                                       hard - 1, ack.nAcks, tx);
 689                                goto protocol_error;
 690                        }
 691
 692                        if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
 693                                goto protocol_error;
 694                }
 695                goto discard;
 696
 697                /* complete ACK to process */
 698        case RXRPC_PACKET_TYPE_ACKALL:
 699                goto all_acked;
 700
 701                /* abort and busy are handled elsewhere */
 702        case RXRPC_PACKET_TYPE_BUSY:
 703        case RXRPC_PACKET_TYPE_ABORT:
 704                BUG();
 705
 706                /* connection level events - also handled elsewhere */
 707        case RXRPC_PACKET_TYPE_CHALLENGE:
 708        case RXRPC_PACKET_TYPE_RESPONSE:
 709        case RXRPC_PACKET_TYPE_DEBUG:
 710                BUG();
 711        }
 712
 713        /* if we've had a hard ACK that covers all the packets we've sent, then
 714         * that ends that phase of the operation */
 715all_acked:
 716        write_lock_bh(&call->state_lock);
 717        _debug("ack all %d", call->state);
 718
 719        switch (call->state) {
 720        case RXRPC_CALL_CLIENT_AWAIT_REPLY:
 721                call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
 722                break;
 723        case RXRPC_CALL_SERVER_AWAIT_ACK:
 724                _debug("srv complete");
 725                call->state = RXRPC_CALL_COMPLETE;
 726                post_ACK = true;
 727                break;
 728        case RXRPC_CALL_CLIENT_SEND_REQUEST:
 729        case RXRPC_CALL_SERVER_RECV_REQUEST:
 730                goto protocol_error_unlock; /* can't occur yet */
 731        default:
 732                write_unlock_bh(&call->state_lock);
 733                goto discard; /* assume packet left over from earlier phase */
 734        }
 735
 736        write_unlock_bh(&call->state_lock);
 737
 738        /* if all the packets we sent are hard-ACK'd, then we can discard
 739         * whatever we've got left */
 740        _debug("clear Tx %d",
 741               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 742
 743        del_timer_sync(&call->resend_timer);
 744        clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 745        clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 746
 747        if (call->acks_window)
 748                rxrpc_zap_tx_window(call);
 749
 750        if (post_ACK) {
 751                /* post the final ACK message for userspace to pick up */
 752                _debug("post ACK");
 753                skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
 754                sp->call = call;
 755                rxrpc_get_call(call);
 756                spin_lock_bh(&call->lock);
 757                if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
 758                        BUG();
 759                spin_unlock_bh(&call->lock);
 760                goto process_further;
 761        }
 762
 763discard:
 764        rxrpc_free_skb(skb);
 765        goto process_further;
 766
 767protocol_error_unlock:
 768        write_unlock_bh(&call->state_lock);
 769protocol_error:
 770        rxrpc_free_skb(skb);
 771        _leave(" = -EPROTO");
 772        return -EPROTO;
 773}
 774
 775/*
 776 * post a message to the socket Rx queue for recvmsg() to pick up
 777 */
 778static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
 779                              bool fatal)
 780{
 781        struct rxrpc_skb_priv *sp;
 782        struct sk_buff *skb;
 783        int ret;
 784
 785        _enter("{%d,%lx},%u,%u,%d",
 786               call->debug_id, call->flags, mark, error, fatal);
 787
 788        /* remove timers and things for fatal messages */
 789        if (fatal) {
 790                del_timer_sync(&call->resend_timer);
 791                del_timer_sync(&call->ack_timer);
 792                clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 793        }
 794
 795        if (mark != RXRPC_SKB_MARK_NEW_CALL &&
 796            !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 797                _leave("[no userid]");
 798                return 0;
 799        }
 800
 801        if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
 802                skb = alloc_skb(0, GFP_NOFS);
 803                if (!skb)
 804                        return -ENOMEM;
 805
 806                rxrpc_new_skb(skb);
 807
 808                skb->mark = mark;
 809
 810                sp = rxrpc_skb(skb);
 811                memset(sp, 0, sizeof(*sp));
 812                sp->error = error;
 813                sp->call = call;
 814                rxrpc_get_call(call);
 815
 816                spin_lock_bh(&call->lock);
 817                ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
 818                spin_unlock_bh(&call->lock);
 819                BUG_ON(ret < 0);
 820        }
 821
 822        return 0;
 823}
 824
 825/*
 826 * handle background processing of incoming call packets and ACK / abort
 827 * generation
 828 */
 829void rxrpc_process_call(struct work_struct *work)
 830{
 831        struct rxrpc_call *call =
 832                container_of(work, struct rxrpc_call, processor);
 833        struct rxrpc_ackpacket ack;
 834        struct rxrpc_ackinfo ackinfo;
 835        struct rxrpc_header hdr;
 836        struct msghdr msg;
 837        struct kvec iov[5];
 838        unsigned long bits;
 839        __be32 data, pad;
 840        size_t len;
 841        int genbit, loop, nbit, ioc, ret, mtu;
 842        u32 abort_code = RX_PROTOCOL_ERROR;
 843        u8 *acks = NULL;
 844
 845        //printk("\n--------------------\n");
 846        _enter("{%d,%s,%lx} [%lu]",
 847               call->debug_id, rxrpc_call_states[call->state], call->events,
 848               (jiffies - call->creation_jif) / (HZ / 10));
 849
 850        if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
 851                _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
 852                return;
 853        }
 854
 855        /* there's a good chance we're going to have to send a message, so set
 856         * one up in advance */
 857        msg.msg_name    = &call->conn->trans->peer->srx.transport.sin;
 858        msg.msg_namelen = sizeof(call->conn->trans->peer->srx.transport.sin);
 859        msg.msg_control = NULL;
 860        msg.msg_controllen = 0;
 861        msg.msg_flags   = 0;
 862
 863        hdr.epoch       = call->conn->epoch;
 864        hdr.cid         = call->cid;
 865        hdr.callNumber  = call->call_id;
 866        hdr.seq         = 0;
 867        hdr.type        = RXRPC_PACKET_TYPE_ACK;
 868        hdr.flags       = call->conn->out_clientflag;
 869        hdr.userStatus  = 0;
 870        hdr.securityIndex = call->conn->security_ix;
 871        hdr._rsvd       = 0;
 872        hdr.serviceId   = call->conn->service_id;
 873
 874        memset(iov, 0, sizeof(iov));
 875        iov[0].iov_base = &hdr;
 876        iov[0].iov_len  = sizeof(hdr);
 877
 878        /* deal with events of a final nature */
 879        if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
 880                rxrpc_release_call(call);
 881                clear_bit(RXRPC_CALL_RELEASE, &call->events);
 882        }
 883
 884        if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
 885                int error;
 886
 887                clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 888                clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 889                clear_bit(RXRPC_CALL_ABORT, &call->events);
 890
 891                error = call->conn->trans->peer->net_error;
 892                _debug("post net error %d", error);
 893
 894                if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
 895                                       error, true) < 0)
 896                        goto no_mem;
 897                clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
 898                goto kill_ACKs;
 899        }
 900
 901        if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
 902                ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 903
 904                clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 905                clear_bit(RXRPC_CALL_ABORT, &call->events);
 906
 907                _debug("post conn abort");
 908
 909                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 910                                       call->conn->error, true) < 0)
 911                        goto no_mem;
 912                clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 913                goto kill_ACKs;
 914        }
 915
 916        if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
 917                hdr.type = RXRPC_PACKET_TYPE_BUSY;
 918                genbit = RXRPC_CALL_REJECT_BUSY;
 919                goto send_message;
 920        }
 921
 922        if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
 923                ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 924
 925                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 926                                       ECONNABORTED, true) < 0)
 927                        goto no_mem;
 928                hdr.type = RXRPC_PACKET_TYPE_ABORT;
 929                data = htonl(call->abort_code);
 930                iov[1].iov_base = &data;
 931                iov[1].iov_len = sizeof(data);
 932                genbit = RXRPC_CALL_ABORT;
 933                goto send_message;
 934        }
 935
 936        if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
 937                genbit = RXRPC_CALL_ACK_FINAL;
 938
 939                ack.bufferSpace = htons(8);
 940                ack.maxSkew     = 0;
 941                ack.serial      = 0;
 942                ack.reason      = RXRPC_ACK_IDLE;
 943                ack.nAcks       = 0;
 944                call->ackr_reason = 0;
 945
 946                spin_lock_bh(&call->lock);
 947                ack.serial = call->ackr_serial;
 948                ack.previousPacket = call->ackr_prev_seq;
 949                ack.firstPacket = htonl(call->rx_data_eaten + 1);
 950                spin_unlock_bh(&call->lock);
 951
 952                pad = 0;
 953
 954                iov[1].iov_base = &ack;
 955                iov[1].iov_len  = sizeof(ack);
 956                iov[2].iov_base = &pad;
 957                iov[2].iov_len  = 3;
 958                iov[3].iov_base = &ackinfo;
 959                iov[3].iov_len  = sizeof(ackinfo);
 960                goto send_ACK;
 961        }
 962
 963        if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
 964                            (1 << RXRPC_CALL_RCVD_ABORT))
 965            ) {
 966                u32 mark;
 967
 968                if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
 969                        mark = RXRPC_SKB_MARK_REMOTE_ABORT;
 970                else
 971                        mark = RXRPC_SKB_MARK_BUSY;
 972
 973                _debug("post abort/busy");
 974                rxrpc_clear_tx_window(call);
 975                if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
 976                        goto no_mem;
 977
 978                clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
 979                clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
 980                goto kill_ACKs;
 981        }
 982
 983        if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
 984                _debug("do implicit ackall");
 985                rxrpc_clear_tx_window(call);
 986        }
 987
 988        if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
 989                write_lock_bh(&call->state_lock);
 990                if (call->state <= RXRPC_CALL_COMPLETE) {
 991                        call->state = RXRPC_CALL_LOCALLY_ABORTED;
 992                        call->abort_code = RX_CALL_TIMEOUT;
 993                        set_bit(RXRPC_CALL_ABORT, &call->events);
 994                }
 995                write_unlock_bh(&call->state_lock);
 996
 997                _debug("post timeout");
 998                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 999                                       ETIME, true) < 0)
1000                        goto no_mem;
1001
1002                clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
1003                goto kill_ACKs;
1004        }
1005
1006        /* deal with assorted inbound messages */
1007        if (!skb_queue_empty(&call->rx_queue)) {
1008                switch (rxrpc_process_rx_queue(call, &abort_code)) {
1009                case 0:
1010                case -EAGAIN:
1011                        break;
1012                case -ENOMEM:
1013                        goto no_mem;
1014                case -EKEYEXPIRED:
1015                case -EKEYREJECTED:
1016                case -EPROTO:
1017                        rxrpc_abort_call(call, abort_code);
1018                        goto kill_ACKs;
1019                }
1020        }
1021
1022        /* handle resending */
1023        if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
1024                rxrpc_resend_timer(call);
1025        if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
1026                rxrpc_resend(call);
1027
1028        /* consider sending an ordinary ACK */
1029        if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1030                _debug("send ACK: window: %d - %d { %lx }",
1031                       call->rx_data_eaten, call->ackr_win_top,
1032                       call->ackr_window[0]);
1033
1034                if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1035                    call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
1036                        /* ACK by sending reply DATA packet in this state */
1037                        clear_bit(RXRPC_CALL_ACK, &call->events);
1038                        goto maybe_reschedule;
1039                }
1040
1041                genbit = RXRPC_CALL_ACK;
1042
1043                acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1044                               GFP_NOFS);
1045                if (!acks)
1046                        goto no_mem;
1047
1048                //hdr.flags     = RXRPC_SLOW_START_OK;
1049                ack.bufferSpace = htons(8);
1050                ack.maxSkew     = 0;
1051                ack.serial      = 0;
1052                ack.reason      = 0;
1053
1054                spin_lock_bh(&call->lock);
1055                ack.reason = call->ackr_reason;
1056                ack.serial = call->ackr_serial;
1057                ack.previousPacket = call->ackr_prev_seq;
1058                ack.firstPacket = htonl(call->rx_data_eaten + 1);
1059
1060                ack.nAcks = 0;
1061                for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1062                        nbit = loop * BITS_PER_LONG;
1063                        for (bits = call->ackr_window[loop]; bits; bits >>= 1
1064                             ) {
1065                                _debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1066                                if (bits & 1) {
1067                                        acks[nbit] = RXRPC_ACK_TYPE_ACK;
1068                                        ack.nAcks = nbit + 1;
1069                                }
1070                                nbit++;
1071                        }
1072                }
1073                call->ackr_reason = 0;
1074                spin_unlock_bh(&call->lock);
1075
1076                pad = 0;
1077
1078                iov[1].iov_base = &ack;
1079                iov[1].iov_len  = sizeof(ack);
1080                iov[2].iov_base = acks;
1081                iov[2].iov_len  = ack.nAcks;
1082                iov[3].iov_base = &pad;
1083                iov[3].iov_len  = 3;
1084                iov[4].iov_base = &ackinfo;
1085                iov[4].iov_len  = sizeof(ackinfo);
1086
1087                switch (ack.reason) {
1088                case RXRPC_ACK_REQUESTED:
1089                case RXRPC_ACK_DUPLICATE:
1090                case RXRPC_ACK_OUT_OF_SEQUENCE:
1091                case RXRPC_ACK_EXCEEDS_WINDOW:
1092                case RXRPC_ACK_NOSPACE:
1093                case RXRPC_ACK_PING:
1094                case RXRPC_ACK_PING_RESPONSE:
1095                        goto send_ACK_with_skew;
1096                case RXRPC_ACK_DELAY:
1097                case RXRPC_ACK_IDLE:
1098                        goto send_ACK;
1099                }
1100        }
1101
1102        /* handle completion of security negotiations on an incoming
1103         * connection */
1104        if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
1105                _debug("secured");
1106                spin_lock_bh(&call->lock);
1107
1108                if (call->state == RXRPC_CALL_SERVER_SECURING) {
1109                        _debug("securing");
1110                        write_lock(&call->conn->lock);
1111                        if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1112                            !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1113                                _debug("not released");
1114                                call->state = RXRPC_CALL_SERVER_ACCEPTING;
1115                                list_move_tail(&call->accept_link,
1116                                               &call->socket->acceptq);
1117                        }
1118                        write_unlock(&call->conn->lock);
1119                        read_lock(&call->state_lock);
1120                        if (call->state < RXRPC_CALL_COMPLETE)
1121                                set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1122                        read_unlock(&call->state_lock);
1123                }
1124
1125                spin_unlock_bh(&call->lock);
1126                if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1127                        goto maybe_reschedule;
1128        }
1129
1130        /* post a notification of an acceptable connection to the app */
1131        if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1132                _debug("post accept");
1133                if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1134                                       0, false) < 0)
1135                        goto no_mem;
1136                clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1137                goto maybe_reschedule;
1138        }
1139
1140        /* handle incoming call acceptance */
1141        if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
1142                _debug("accepted");
1143                ASSERTCMP(call->rx_data_post, ==, 0);
1144                call->rx_data_post = 1;
1145                read_lock_bh(&call->state_lock);
1146                if (call->state < RXRPC_CALL_COMPLETE)
1147                        set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
1148                read_unlock_bh(&call->state_lock);
1149        }
1150
1151        /* drain the out of sequence received packet queue into the packet Rx
1152         * queue */
1153        if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
1154                while (call->rx_data_post == call->rx_first_oos)
1155                        if (rxrpc_drain_rx_oos_queue(call) < 0)
1156                                break;
1157                goto maybe_reschedule;
1158        }
1159
1160        /* other events may have been raised since we started checking */
1161        goto maybe_reschedule;
1162
1163send_ACK_with_skew:
1164        ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1165                            ntohl(ack.serial));
1166send_ACK:
1167        mtu = call->conn->trans->peer->if_mtu;
1168        mtu -= call->conn->trans->peer->hdrsize;
1169        ackinfo.maxMTU  = htonl(mtu);
1170        ackinfo.rwind   = htonl(32);
1171
1172        /* permit the peer to send us jumbo packets if it wants to */
1173        ackinfo.rxMTU   = htonl(5692);
1174        ackinfo.jumbo_max = htonl(4);
1175
1176        hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1177        _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1178               ntohl(hdr.serial),
1179               ntohs(ack.maxSkew),
1180               ntohl(ack.firstPacket),
1181               ntohl(ack.previousPacket),
1182               ntohl(ack.serial),
1183               rxrpc_acks[ack.reason],
1184               ack.nAcks);
1185
1186        del_timer_sync(&call->ack_timer);
1187        if (ack.nAcks > 0)
1188                set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
1189        goto send_message_2;
1190
1191send_message:
1192        _debug("send message");
1193
1194        hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1195        _proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1196send_message_2:
1197
1198        len = iov[0].iov_len;
1199        ioc = 1;
1200        if (iov[4].iov_len) {
1201                ioc = 5;
1202                len += iov[4].iov_len;
1203                len += iov[3].iov_len;
1204                len += iov[2].iov_len;
1205                len += iov[1].iov_len;
1206        } else if (iov[3].iov_len) {
1207                ioc = 4;
1208                len += iov[3].iov_len;
1209                len += iov[2].iov_len;
1210                len += iov[1].iov_len;
1211        } else if (iov[2].iov_len) {
1212                ioc = 3;
1213                len += iov[2].iov_len;
1214                len += iov[1].iov_len;
1215        } else if (iov[1].iov_len) {
1216                ioc = 2;
1217                len += iov[1].iov_len;
1218        }
1219
1220        ret = kernel_sendmsg(call->conn->trans->local->socket,
1221                             &msg, iov, ioc, len);
1222        if (ret < 0) {
1223                _debug("sendmsg failed: %d", ret);
1224                read_lock_bh(&call->state_lock);
1225                if (call->state < RXRPC_CALL_DEAD)
1226                        rxrpc_queue_call(call);
1227                read_unlock_bh(&call->state_lock);
1228                goto error;
1229        }
1230
1231        switch (genbit) {
1232        case RXRPC_CALL_ABORT:
1233                clear_bit(genbit, &call->events);
1234                clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1235                goto kill_ACKs;
1236
1237        case RXRPC_CALL_ACK_FINAL:
1238                write_lock_bh(&call->state_lock);
1239                if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1240                        call->state = RXRPC_CALL_COMPLETE;
1241                write_unlock_bh(&call->state_lock);
1242                goto kill_ACKs;
1243
1244        default:
1245                clear_bit(genbit, &call->events);
1246                switch (call->state) {
1247                case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1248                case RXRPC_CALL_CLIENT_RECV_REPLY:
1249                case RXRPC_CALL_SERVER_RECV_REQUEST:
1250                case RXRPC_CALL_SERVER_ACK_REQUEST:
1251                        _debug("start ACK timer");
1252                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
1253                                          call->ackr_serial, false);
1254                default:
1255                        break;
1256                }
1257                goto maybe_reschedule;
1258        }
1259
1260kill_ACKs:
1261        del_timer_sync(&call->ack_timer);
1262        if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
1263                rxrpc_put_call(call);
1264        clear_bit(RXRPC_CALL_ACK, &call->events);
1265
1266maybe_reschedule:
1267        if (call->events || !skb_queue_empty(&call->rx_queue)) {
1268                read_lock_bh(&call->state_lock);
1269                if (call->state < RXRPC_CALL_DEAD)
1270                        rxrpc_queue_call(call);
1271                read_unlock_bh(&call->state_lock);
1272        }
1273
1274        /* don't leave aborted connections on the accept queue */
1275        if (call->state >= RXRPC_CALL_COMPLETE &&
1276            !list_empty(&call->accept_link)) {
1277                _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1278                       call, call->events, call->flags,
1279                       ntohl(call->conn->cid));
1280
1281                read_lock_bh(&call->state_lock);
1282                if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1283                    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1284                        rxrpc_queue_call(call);
1285                read_unlock_bh(&call->state_lock);
1286        }
1287
1288error:
1289        clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
1290        kfree(acks);
1291
1292        /* because we don't want two CPUs both processing the work item for one
1293         * call at the same time, we use a flag to note when it's busy; however
1294         * this means there's a race between clearing the flag and setting the
1295         * work pending bit and the work item being processed again */
1296        if (call->events && !work_pending(&call->processor)) {
1297                _debug("jumpstart %x", ntohl(call->conn->cid));
1298                rxrpc_queue_call(call);
1299        }
1300
1301        _leave("");
1302        return;
1303
1304no_mem:
1305        _debug("out of memory");
1306        goto maybe_reschedule;
1307}
1308
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.