linux/net/rxrpc/ar-recvmsg.c
<<
>>
Prefs
   1/* RxRPC recvmsg() implementation
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/net.h>
  13#include <linux/skbuff.h>
  14#include <linux/export.h>
  15#include <net/sock.h>
  16#include <net/af_rxrpc.h>
  17#include "ar-internal.h"
  18
  19/*
  20 * removal a call's user ID from the socket tree to make the user ID available
  21 * again and so that it won't be seen again in association with that call
  22 */
  23void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
  24{
  25        _debug("RELEASE CALL %d", call->debug_id);
  26
  27        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
  28                write_lock_bh(&rx->call_lock);
  29                rb_erase(&call->sock_node, &call->socket->calls);
  30                clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
  31                write_unlock_bh(&rx->call_lock);
  32        }
  33
  34        read_lock_bh(&call->state_lock);
  35        if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
  36            !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
  37                rxrpc_queue_call(call);
  38        read_unlock_bh(&call->state_lock);
  39}
  40
  41/*
  42 * receive a message from an RxRPC socket
  43 * - we need to be careful about two or more threads calling recvmsg
  44 *   simultaneously
  45 */
  46int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock,
  47                  struct msghdr *msg, size_t len, int flags)
  48{
  49        struct rxrpc_skb_priv *sp;
  50        struct rxrpc_call *call = NULL, *continue_call = NULL;
  51        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
  52        struct sk_buff *skb;
  53        long timeo;
  54        int copy, ret, ullen, offset, copied = 0;
  55        u32 abort_code;
  56
  57        DEFINE_WAIT(wait);
  58
  59        _enter(",,,%zu,%d", len, flags);
  60
  61        if (flags & (MSG_OOB | MSG_TRUNC))
  62                return -EOPNOTSUPP;
  63
  64        ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long);
  65
  66        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
  67        msg->msg_flags |= MSG_MORE;
  68
  69        lock_sock(&rx->sk);
  70
  71        for (;;) {
  72                /* return immediately if a client socket has no outstanding
  73                 * calls */
  74                if (RB_EMPTY_ROOT(&rx->calls)) {
  75                        if (copied)
  76                                goto out;
  77                        if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
  78                                release_sock(&rx->sk);
  79                                if (continue_call)
  80                                        rxrpc_put_call(continue_call);
  81                                return -ENODATA;
  82                        }
  83                }
  84
  85                /* get the next message on the Rx queue */
  86                skb = skb_peek(&rx->sk.sk_receive_queue);
  87                if (!skb) {
  88                        /* nothing remains on the queue */
  89                        if (copied &&
  90                            (msg->msg_flags & MSG_PEEK || timeo == 0))
  91                                goto out;
  92
  93                        /* wait for a message to turn up */
  94                        release_sock(&rx->sk);
  95                        prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
  96                                                  TASK_INTERRUPTIBLE);
  97                        ret = sock_error(&rx->sk);
  98                        if (ret)
  99                                goto wait_error;
 100
 101                        if (skb_queue_empty(&rx->sk.sk_receive_queue)) {
 102                                if (signal_pending(current))
 103                                        goto wait_interrupted;
 104                                timeo = schedule_timeout(timeo);
 105                        }
 106                        finish_wait(sk_sleep(&rx->sk), &wait);
 107                        lock_sock(&rx->sk);
 108                        continue;
 109                }
 110
 111        peek_next_packet:
 112                sp = rxrpc_skb(skb);
 113                call = sp->call;
 114                ASSERT(call != NULL);
 115
 116                _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
 117
 118                /* make sure we wait for the state to be updated in this call */
 119                spin_lock_bh(&call->lock);
 120                spin_unlock_bh(&call->lock);
 121
 122                if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
 123                        _debug("packet from released call");
 124                        if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 125                                BUG();
 126                        rxrpc_free_skb(skb);
 127                        continue;
 128                }
 129
 130                /* determine whether to continue last data receive */
 131                if (continue_call) {
 132                        _debug("maybe cont");
 133                        if (call != continue_call ||
 134                            skb->mark != RXRPC_SKB_MARK_DATA) {
 135                                release_sock(&rx->sk);
 136                                rxrpc_put_call(continue_call);
 137                                _leave(" = %d [noncont]", copied);
 138                                return copied;
 139                        }
 140                }
 141
 142                rxrpc_get_call(call);
 143
 144                /* copy the peer address and timestamp */
 145                if (!continue_call) {
 146                        if (msg->msg_name && msg->msg_namelen > 0)
 147                                memcpy(msg->msg_name,
 148                                       &call->conn->trans->peer->srx,
 149                                       sizeof(call->conn->trans->peer->srx));
 150                        sock_recv_ts_and_drops(msg, &rx->sk, skb);
 151                }
 152
 153                /* receive the message */
 154                if (skb->mark != RXRPC_SKB_MARK_DATA)
 155                        goto receive_non_data_message;
 156
 157                _debug("recvmsg DATA #%u { %d, %d }",
 158                       ntohl(sp->hdr.seq), skb->len, sp->offset);
 159
 160                if (!continue_call) {
 161                        /* only set the control data once per recvmsg() */
 162                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 163                                       ullen, &call->user_call_ID);
 164                        if (ret < 0)
 165                                goto copy_error;
 166                        ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
 167                }
 168
 169                ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
 170                ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
 171                call->rx_data_recv = ntohl(sp->hdr.seq);
 172
 173                ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
 174
 175                offset = sp->offset;
 176                copy = skb->len - offset;
 177                if (copy > len - copied)
 178                        copy = len - copied;
 179
 180                if (skb->ip_summed == CHECKSUM_UNNECESSARY) {
 181                        ret = skb_copy_datagram_iovec(skb, offset,
 182                                                      msg->msg_iov, copy);
 183                } else {
 184                        ret = skb_copy_and_csum_datagram_iovec(skb, offset,
 185                                                               msg->msg_iov);
 186                        if (ret == -EINVAL)
 187                                goto csum_copy_error;
 188                }
 189
 190                if (ret < 0)
 191                        goto copy_error;
 192
 193                /* handle piecemeal consumption of data packets */
 194                _debug("copied %d+%d", copy, copied);
 195
 196                offset += copy;
 197                copied += copy;
 198
 199                if (!(flags & MSG_PEEK))
 200                        sp->offset = offset;
 201
 202                if (sp->offset < skb->len) {
 203                        _debug("buffer full");
 204                        ASSERTCMP(copied, ==, len);
 205                        break;
 206                }
 207
 208                /* we transferred the whole data packet */
 209                if (sp->hdr.flags & RXRPC_LAST_PACKET) {
 210                        _debug("last");
 211                        if (call->conn->out_clientflag) {
 212                                 /* last byte of reply received */
 213                                ret = copied;
 214                                goto terminal_message;
 215                        }
 216
 217                        /* last bit of request received */
 218                        if (!(flags & MSG_PEEK)) {
 219                                _debug("eat packet");
 220                                if (skb_dequeue(&rx->sk.sk_receive_queue) !=
 221                                    skb)
 222                                        BUG();
 223                                rxrpc_free_skb(skb);
 224                        }
 225                        msg->msg_flags &= ~MSG_MORE;
 226                        break;
 227                }
 228
 229                /* move on to the next data message */
 230                _debug("next");
 231                if (!continue_call)
 232                        continue_call = sp->call;
 233                else
 234                        rxrpc_put_call(call);
 235                call = NULL;
 236
 237                if (flags & MSG_PEEK) {
 238                        _debug("peek next");
 239                        skb = skb->next;
 240                        if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue)
 241                                break;
 242                        goto peek_next_packet;
 243                }
 244
 245                _debug("eat packet");
 246                if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 247                        BUG();
 248                rxrpc_free_skb(skb);
 249        }
 250
 251        /* end of non-terminal data packet reception for the moment */
 252        _debug("end rcv data");
 253out:
 254        release_sock(&rx->sk);
 255        if (call)
 256                rxrpc_put_call(call);
 257        if (continue_call)
 258                rxrpc_put_call(continue_call);
 259        _leave(" = %d [data]", copied);
 260        return copied;
 261
 262        /* handle non-DATA messages such as aborts, incoming connections and
 263         * final ACKs */
 264receive_non_data_message:
 265        _debug("non-data");
 266
 267        if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) {
 268                _debug("RECV NEW CALL");
 269                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code);
 270                if (ret < 0)
 271                        goto copy_error;
 272                if (!(flags & MSG_PEEK)) {
 273                        if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 274                                BUG();
 275                        rxrpc_free_skb(skb);
 276                }
 277                goto out;
 278        }
 279
 280        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 281                       ullen, &call->user_call_ID);
 282        if (ret < 0)
 283                goto copy_error;
 284        ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
 285
 286        switch (skb->mark) {
 287        case RXRPC_SKB_MARK_DATA:
 288                BUG();
 289        case RXRPC_SKB_MARK_FINAL_ACK:
 290                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code);
 291                break;
 292        case RXRPC_SKB_MARK_BUSY:
 293                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
 294                break;
 295        case RXRPC_SKB_MARK_REMOTE_ABORT:
 296                abort_code = call->abort_code;
 297                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
 298                break;
 299        case RXRPC_SKB_MARK_NET_ERROR:
 300                _debug("RECV NET ERROR %d", sp->error);
 301                abort_code = sp->error;
 302                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code);
 303                break;
 304        case RXRPC_SKB_MARK_LOCAL_ERROR:
 305                _debug("RECV LOCAL ERROR %d", sp->error);
 306                abort_code = sp->error;
 307                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
 308                               &abort_code);
 309                break;
 310        default:
 311                BUG();
 312                break;
 313        }
 314
 315        if (ret < 0)
 316                goto copy_error;
 317
 318terminal_message:
 319        _debug("terminal");
 320        msg->msg_flags &= ~MSG_MORE;
 321        msg->msg_flags |= MSG_EOR;
 322
 323        if (!(flags & MSG_PEEK)) {
 324                _net("free terminal skb %p", skb);
 325                if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
 326                        BUG();
 327                rxrpc_free_skb(skb);
 328                rxrpc_remove_user_ID(rx, call);
 329        }
 330
 331        release_sock(&rx->sk);
 332        rxrpc_put_call(call);
 333        if (continue_call)
 334                rxrpc_put_call(continue_call);
 335        _leave(" = %d", ret);
 336        return ret;
 337
 338copy_error:
 339        _debug("copy error");
 340        release_sock(&rx->sk);
 341        rxrpc_put_call(call);
 342        if (continue_call)
 343                rxrpc_put_call(continue_call);
 344        _leave(" = %d", ret);
 345        return ret;
 346
 347csum_copy_error:
 348        _debug("csum error");
 349        release_sock(&rx->sk);
 350        if (continue_call)
 351                rxrpc_put_call(continue_call);
 352        rxrpc_kill_skb(skb);
 353        skb_kill_datagram(&rx->sk, skb, flags);
 354        rxrpc_put_call(call);
 355        return -EAGAIN;
 356
 357wait_interrupted:
 358        ret = sock_intr_errno(timeo);
 359wait_error:
 360        finish_wait(sk_sleep(&rx->sk), &wait);
 361        if (continue_call)
 362                rxrpc_put_call(continue_call);
 363        if (copied)
 364                copied = ret;
 365        _leave(" = %d [waitfail %d]", copied, ret);
 366        return copied;
 367
 368}
 369
 370/**
 371 * rxrpc_kernel_data_delivered - Record delivery of data message
 372 * @skb: Message holding data
 373 *
 374 * Record the delivery of a data message.  This permits RxRPC to keep its
 375 * tracking correct.  The socket buffer will be deleted.
 376 */
 377void rxrpc_kernel_data_delivered(struct sk_buff *skb)
 378{
 379        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 380        struct rxrpc_call *call = sp->call;
 381
 382        ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
 383        ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
 384        call->rx_data_recv = ntohl(sp->hdr.seq);
 385
 386        ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
 387        rxrpc_free_skb(skb);
 388}
 389
 390EXPORT_SYMBOL(rxrpc_kernel_data_delivered);
 391
 392/**
 393 * rxrpc_kernel_is_data_last - Determine if data message is last one
 394 * @skb: Message holding data
 395 *
 396 * Determine if data message is last one for the parent call.
 397 */
 398bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
 399{
 400        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 401
 402        ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
 403
 404        return sp->hdr.flags & RXRPC_LAST_PACKET;
 405}
 406
 407EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
 408
 409/**
 410 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
 411 * @skb: Message indicating an abort
 412 *
 413 * Get the abort code from an RxRPC abort message.
 414 */
 415u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
 416{
 417        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 418
 419        ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT);
 420
 421        return sp->call->abort_code;
 422}
 423
 424EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
 425
 426/**
 427 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
 428 * @skb: Message indicating an error
 429 *
 430 * Get the error number from an RxRPC error message.
 431 */
 432int rxrpc_kernel_get_error_number(struct sk_buff *skb)
 433{
 434        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 435
 436        return sp->error;
 437}
 438
 439EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
 440