linux/net/rxrpc/ar-recvmsg.c
<<
>>
Prefs
   1/* RxRPC recvmsg() implementation
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/net.h>
  13#include <linux/skbuff.h>
  14#include <net/sock.h>
  15#include <net/af_rxrpc.h>
  16#include "ar-internal.h"
  17
  18/*
  19 * removal a call's user ID from the socket tree to make the user ID available
  20 * again and so that it won't be seen again in association with that call
  21 */
  22void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
  23{
  24        _debug("RELEASE CALL %d", call->debug_id);
  25
  26        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
  27                write_lock_bh(&rx->call_lock);
  28                rb_erase(&call->sock_node, &call->socket->calls);
  29                clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
  30                write_unlock_bh(&rx->call_lock);
  31        }
  32
  33        read_lock_bh(&call->state_lock);
  34        if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
  35            !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
  36                rxrpc_queue_call(call);
  37        read_unlock_bh(&call->state_lock);
  38}
  39
  40/*
  41 * receive a message from an RxRPC socket
  42 * - we need to be careful about two or more threads calling recvmsg
  43 *   simultaneously
  44 */
  45int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock,
  46                  struct msghdr *msg, size_t len, int flags)
  47{
  48        struct rxrpc_skb_priv *sp;
  49        struct rxrpc_call *call = NULL, *continue_call = NULL;
  50        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
  51        struct sk_buff *skb;
  52        long timeo;
  53        int copy, ret, ullen, offset, copied = 0;
  54        u32 abort_code;
  55
  56        DEFINE_WAIT(wait);
  57
  58        _enter(",,,%zu,%d", len, flags);
  59
  60        if (flags & (MSG_OOB | MSG_TRUNC))
  61                return -EOPNOTSUPP;
  62
  63        ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long);
  64
  65        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
  66        msg->msg_flags |= MSG_MORE;
  67
  68        lock_sock(&rx->sk);
  69
  70        for (;;) {
  71                /* return immediately if a client socket has no outstanding
  72                 * calls */
  73                if (RB_EMPTY_ROOT(&rx->calls)) {
  74                        if (copied)
  75                                goto out;
  76                        if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
  77                                release_sock(&rx->sk);
  78                                if (continue_call)
  79                                        rxrpc_put_call(continue_call);
  80                                return -ENODATA;
  81                        }
  82                }
  83
  84                /* get the next message on the Rx queue */
  85                skb = skb_peek(&rx->sk.sk_receive_queue);
  86                if (!skb) {
  87                        /* nothing remains on the queue */
  88                        if (copied &&
  89                            (msg->msg_flags & MSG_PEEK || timeo == 0))
  90                                goto out;
  91
  92                        /* wait for a message to turn up */
  93                        release_sock(&rx->sk);
  94                        prepare_to_wait_exclusive(rx->sk.sk_sleep, &wait,
  95                                                  TASK_INTERRUPTIBLE);
  96                        ret = sock_error(&rx->sk);
  97                        if (ret)
  98                                goto wait_error;
  99
 100                        if (skb_queue_empty(&rx->sk.sk_receive_queue)) {
 101                                if (signal_pending(current))
 102                                        goto wait_interrupted;
 103                                timeo = schedule_timeout(timeo);
 104                        }
 105                        finish_wait(rx->sk.sk_sleep, &wait);
 106                        lock_sock(&rx->sk);
 107                        continue;
 108                }
 109
 110        peek_next_packet:
 111                sp = rxrpc_skb(skb);
 112                call = sp->call;
 113                ASSERT(call != NULL);
 114
 115                _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
 116
 117                /* make sure we wait for the state to be updated in this call */
 118                spin_lock_bh(&call->lock);
 119                spin_unlock_bh(&call->lock);
 120
 121                if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
 122                        _debug("packet from released call");
 123                        if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 124                                BUG();
 125                        rxrpc_free_skb(skb);
 126                        continue;
 127                }
 128
 129                /* determine whether to continue last data receive */
 130                if (continue_call) {
 131                        _debug("maybe cont");
 132                        if (call != continue_call ||
 133                            skb->mark != RXRPC_SKB_MARK_DATA) {
 134                                release_sock(&rx->sk);
 135                                rxrpc_put_call(continue_call);
 136                                _leave(" = %d [noncont]", copied);
 137                                return copied;
 138                        }
 139                }
 140
 141                rxrpc_get_call(call);
 142
 143                /* copy the peer address and timestamp */
 144                if (!continue_call) {
 145                        if (msg->msg_name && msg->msg_namelen > 0)
 146                                memcpy(msg->msg_name,
 147                                       &call->conn->trans->peer->srx,
 148                                       sizeof(call->conn->trans->peer->srx));
 149                        sock_recv_timestamp(msg, &rx->sk, skb);
 150                }
 151
 152                /* receive the message */
 153                if (skb->mark != RXRPC_SKB_MARK_DATA)
 154                        goto receive_non_data_message;
 155
 156                _debug("recvmsg DATA #%u { %d, %d }",
 157                       ntohl(sp->hdr.seq), skb->len, sp->offset);
 158
 159                if (!continue_call) {
 160                        /* only set the control data once per recvmsg() */
 161                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 162                                       ullen, &call->user_call_ID);
 163                        if (ret < 0)
 164                                goto copy_error;
 165                        ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
 166                }
 167
 168                ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
 169                ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
 170                call->rx_data_recv = ntohl(sp->hdr.seq);
 171
 172                ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
 173
 174                offset = sp->offset;
 175                copy = skb->len - offset;
 176                if (copy > len - copied)
 177                        copy = len - copied;
 178
 179                if (skb->ip_summed == CHECKSUM_UNNECESSARY) {
 180                        ret = skb_copy_datagram_iovec(skb, offset,
 181                                                      msg->msg_iov, copy);
 182                } else {
 183                        ret = skb_copy_and_csum_datagram_iovec(skb, offset,
 184                                                               msg->msg_iov);
 185                        if (ret == -EINVAL)
 186                                goto csum_copy_error;
 187                }
 188
 189                if (ret < 0)
 190                        goto copy_error;
 191
 192                /* handle piecemeal consumption of data packets */
 193                _debug("copied %d+%d", copy, copied);
 194
 195                offset += copy;
 196                copied += copy;
 197
 198                if (!(flags & MSG_PEEK))
 199                        sp->offset = offset;
 200
 201                if (sp->offset < skb->len) {
 202                        _debug("buffer full");
 203                        ASSERTCMP(copied, ==, len);
 204                        break;
 205                }
 206
 207                /* we transferred the whole data packet */
 208                if (sp->hdr.flags & RXRPC_LAST_PACKET) {
 209                        _debug("last");
 210                        if (call->conn->out_clientflag) {
 211                                 /* last byte of reply received */
 212                                ret = copied;
 213                                goto terminal_message;
 214                        }
 215
 216                        /* last bit of request received */
 217                        if (!(flags & MSG_PEEK)) {
 218                                _debug("eat packet");
 219                                if (skb_dequeue(&rx->sk.sk_receive_queue) !=
 220                                    skb)
 221                                        BUG();
 222                                rxrpc_free_skb(skb);
 223                        }
 224                        msg->msg_flags &= ~MSG_MORE;
 225                        break;
 226                }
 227
 228                /* move on to the next data message */
 229                _debug("next");
 230                if (!continue_call)
 231                        continue_call = sp->call;
 232                else
 233                        rxrpc_put_call(call);
 234                call = NULL;
 235
 236                if (flags & MSG_PEEK) {
 237                        _debug("peek next");
 238                        skb = skb->next;
 239                        if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue)
 240                                break;
 241                        goto peek_next_packet;
 242                }
 243
 244                _debug("eat packet");
 245                if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 246                        BUG();
 247                rxrpc_free_skb(skb);
 248        }
 249
 250        /* end of non-terminal data packet reception for the moment */
 251        _debug("end rcv data");
 252out:
 253        release_sock(&rx->sk);
 254        if (call)
 255                rxrpc_put_call(call);
 256        if (continue_call)
 257                rxrpc_put_call(continue_call);
 258        _leave(" = %d [data]", copied);
 259        return copied;
 260
 261        /* handle non-DATA messages such as aborts, incoming connections and
 262         * final ACKs */
 263receive_non_data_message:
 264        _debug("non-data");
 265
 266        if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) {
 267                _debug("RECV NEW CALL");
 268                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code);
 269                if (ret < 0)
 270                        goto copy_error;
 271                if (!(flags & MSG_PEEK)) {
 272                        if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 273                                BUG();
 274                        rxrpc_free_skb(skb);
 275                }
 276                goto out;
 277        }
 278
 279        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 280                       ullen, &call->user_call_ID);
 281        if (ret < 0)
 282                goto copy_error;
 283        ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
 284
 285        switch (skb->mark) {
 286        case RXRPC_SKB_MARK_DATA:
 287                BUG();
 288        case RXRPC_SKB_MARK_FINAL_ACK:
 289                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code);
 290                break;
 291        case RXRPC_SKB_MARK_BUSY:
 292                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
 293                break;
 294        case RXRPC_SKB_MARK_REMOTE_ABORT:
 295                abort_code = call->abort_code;
 296                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
 297                break;
 298        case RXRPC_SKB_MARK_NET_ERROR:
 299                _debug("RECV NET ERROR %d", sp->error);
 300                abort_code = sp->error;
 301                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code);
 302                break;
 303        case RXRPC_SKB_MARK_LOCAL_ERROR:
 304                _debug("RECV LOCAL ERROR %d", sp->error);
 305                abort_code = sp->error;
 306                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
 307                               &abort_code);
 308                break;
 309        default:
 310                BUG();
 311                break;
 312        }
 313
 314        if (ret < 0)
 315                goto copy_error;
 316
 317terminal_message:
 318        _debug("terminal");
 319        msg->msg_flags &= ~MSG_MORE;
 320        msg->msg_flags |= MSG_EOR;
 321
 322        if (!(flags & MSG_PEEK)) {
 323                _net("free terminal skb %p", skb);
 324                if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 325                        BUG();
 326                rxrpc_free_skb(skb);
 327                rxrpc_remove_user_ID(rx, call);
 328        }
 329
 330        release_sock(&rx->sk);
 331        rxrpc_put_call(call);
 332        if (continue_call)
 333                rxrpc_put_call(continue_call);
 334        _leave(" = %d", ret);
 335        return ret;
 336
 337copy_error:
 338        _debug("copy error");
 339        release_sock(&rx->sk);
 340        rxrpc_put_call(call);
 341        if (continue_call)
 342                rxrpc_put_call(continue_call);
 343        _leave(" = %d", ret);
 344        return ret;
 345
 346csum_copy_error:
 347        _debug("csum error");
 348        release_sock(&rx->sk);
 349        if (continue_call)
 350                rxrpc_put_call(continue_call);
 351        rxrpc_kill_skb(skb);
 352        skb_kill_datagram(&rx->sk, skb, flags);
 353        rxrpc_put_call(call);
 354        return -EAGAIN;
 355
 356wait_interrupted:
 357        ret = sock_intr_errno(timeo);
 358wait_error:
 359        finish_wait(rx->sk.sk_sleep, &wait);
 360        if (continue_call)
 361                rxrpc_put_call(continue_call);
 362        if (copied)
 363                copied = ret;
 364        _leave(" = %d [waitfail %d]", copied, ret);
 365        return copied;
 366
 367}
 368
 369/**
 370 * rxrpc_kernel_data_delivered - Record delivery of data message
 371 * @skb: Message holding data
 372 *
 373 * Record the delivery of a data message.  This permits RxRPC to keep its
 374 * tracking correct.  The socket buffer will be deleted.
 375 */
 376void rxrpc_kernel_data_delivered(struct sk_buff *skb)
 377{
 378        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 379        struct rxrpc_call *call = sp->call;
 380
 381        ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
 382        ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
 383        call->rx_data_recv = ntohl(sp->hdr.seq);
 384
 385        ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
 386        rxrpc_free_skb(skb);
 387}
 388
 389EXPORT_SYMBOL(rxrpc_kernel_data_delivered);
 390
 391/**
 392 * rxrpc_kernel_is_data_last - Determine if data message is last one
 393 * @skb: Message holding data
 394 *
 395 * Determine if data message is last one for the parent call.
 396 */
 397bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
 398{
 399        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 400
 401        ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
 402
 403        return sp->hdr.flags & RXRPC_LAST_PACKET;
 404}
 405
 406EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
 407
 408/**
 409 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
 410 * @skb: Message indicating an abort
 411 *
 412 * Get the abort code from an RxRPC abort message.
 413 */
 414u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
 415{
 416        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 417
 418        ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT);
 419
 420        return sp->call->abort_code;
 421}
 422
 423EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
 424
 425/**
 426 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
 427 * @skb: Message indicating an error
 428 *
 429 * Get the error number from an RxRPC error message.
 430 */
 431int rxrpc_kernel_get_error_number(struct sk_buff *skb)
 432{
 433        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 434
 435        return sp->error;
 436}
 437
 438EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
 439