darwin-xnu/osfmk/ipc/ipc_mqueue.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2000-2004 Apple Computer, Inc. All rights reserved.
   3 *
   4 * @APPLE_LICENSE_HEADER_START@
   5 * 
   6 * The contents of this file constitute Original Code as defined in and
   7 * are subject to the Apple Public Source License Version 1.1 (the
   8 * "License").  You may not use this file except in compliance with the
   9 * License.  Please obtain a copy of the License at
  10 * http://www.apple.com/publicsource and read it before using this file.
  11 * 
  12 * This Original Code and all software distributed under the License are
  13 * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER
  14 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
  15 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
  16 * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT.  Please see the
  17 * License for the specific language governing rights and limitations
  18 * under the License.
  19 * 
  20 * @APPLE_LICENSE_HEADER_END@
  21 */
  22/*
  23 * @OSF_FREE_COPYRIGHT@
  24 */
  25/* 
  26 * Mach Operating System
  27 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
  28 * All Rights Reserved.
  29 * 
  30 * Permission to use, copy, modify and distribute this software and its
  31 * documentation is hereby granted, provided that both the copyright
  32 * notice and this permission notice appear in all copies of the
  33 * software, derivative works or modified versions, and any portions
  34 * thereof, and that both notices appear in supporting documentation.
  35 * 
  36 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
  37 * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
  38 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
  39 * 
  40 * Carnegie Mellon requests users of this software to return to
  41 * 
  42 *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
  43 *  School of Computer Science
  44 *  Carnegie Mellon University
  45 *  Pittsburgh PA 15213-3890
  46 * 
  47 * any improvements or extensions that they make and grant Carnegie Mellon
  48 * the rights to redistribute these changes.
  49 */
  50/*
  51 */
  52/*
  53 *      File:   ipc/ipc_mqueue.c
  54 *      Author: Rich Draves
  55 *      Date:   1989
  56 *
  57 *      Functions to manipulate IPC message queues.
  58 */
  59
  60#include <mach/port.h>
  61#include <mach/message.h>
  62#include <mach/sync_policy.h>
  63
  64#include <kern/assert.h>
  65#include <kern/counters.h>
  66#include <kern/sched_prim.h>
  67#include <kern/ipc_kobject.h>
  68#include <kern/ipc_mig.h>       /* XXX - for mach_msg_receive_continue */
  69#include <kern/misc_protos.h>
  70#include <kern/task.h>
  71#include <kern/thread.h>
  72#include <kern/wait_queue.h>
  73
  74#include <ipc/ipc_mqueue.h>
  75#include <ipc/ipc_kmsg.h>
  76#include <ipc/ipc_port.h>
  77#include <ipc/ipc_pset.h>
  78#include <ipc/ipc_space.h>
  79
  80#include <ddb/tr.h>
  81
  82int ipc_mqueue_full;            /* address is event for queue space */
  83int ipc_mqueue_rcv;             /* address is event for message arrival */
  84
  85#define TR_ENABLE 0
  86
  87/* forward declarations */
  88void ipc_mqueue_receive_results(wait_result_t result);
  89
  90/*
  91 *      Routine:        ipc_mqueue_init
  92 *      Purpose:
  93 *              Initialize a newly-allocated message queue.
  94 */
  95void
  96ipc_mqueue_init(
  97        ipc_mqueue_t    mqueue,
  98        boolean_t       is_set)
  99{
 100        if (is_set) {
 101                wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
 102        } else {
 103                wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
 104                ipc_kmsg_queue_init(&mqueue->imq_messages);
 105                mqueue->imq_seqno = 0;
 106                mqueue->imq_msgcount = 0;
 107                mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
 108                mqueue->imq_fullwaiters = FALSE;
 109        }
 110}
 111
 112/*
 113 *      Routine:        ipc_mqueue_member
 114 *      Purpose:
 115 *              Indicate whether the (port) mqueue is a member of
 116 *              this portset's mqueue.  We do this by checking
 117 *              whether the portset mqueue's waitq is an member of
 118 *              the port's mqueue waitq.
 119 *      Conditions:
 120 *              the portset's mqueue is not already a member
 121 *              this may block while allocating linkage structures.
 122 */
 123
 124boolean_t
 125ipc_mqueue_member(
 126        ipc_mqueue_t            port_mqueue,
 127        ipc_mqueue_t            set_mqueue)
 128{
 129        wait_queue_t    port_waitq = &port_mqueue->imq_wait_queue;
 130        wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
 131
 132        return (wait_queue_member(port_waitq, set_waitq));
 133
 134}
 135
 136/*
 137 *      Routine:        ipc_mqueue_remove
 138 *      Purpose:
 139 *              Remove the association between the queue and the specified
 140 *              set message queue.
 141 */
 142
 143kern_return_t
 144ipc_mqueue_remove(
 145        ipc_mqueue_t     mqueue,
 146        ipc_mqueue_t     set_mqueue)
 147{
 148        wait_queue_t     mq_waitq = &mqueue->imq_wait_queue;
 149        wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
 150
 151        return wait_queue_unlink(mq_waitq, set_waitq);
 152}
 153
 154/*
 155 *      Routine:        ipc_mqueue_remove_from_all
 156 *      Purpose:
 157 *              Remove the mqueue from all the sets it is a member of
 158 *      Conditions:
 159 *              Nothing locked.
 160 */
 161void
 162ipc_mqueue_remove_from_all(
 163        ipc_mqueue_t    mqueue)
 164{
 165        wait_queue_t    mq_waitq = &mqueue->imq_wait_queue;
 166
 167        wait_queue_unlink_all(mq_waitq);
 168        return;
 169}
 170
 171/*
 172 *      Routine:        ipc_mqueue_remove_all
 173 *      Purpose:
 174 *              Remove all the member queues from the specified set.
 175 *      Conditions:
 176 *              Nothing locked.
 177 */
 178void
 179ipc_mqueue_remove_all(
 180        ipc_mqueue_t    mqueue)
 181{
 182        wait_queue_set_t        mq_setq = &mqueue->imq_set_queue;
 183
 184        wait_queue_set_unlink_all(mq_setq);
 185        return;
 186}
 187
 188
 189/*
 190 *      Routine:        ipc_mqueue_add
 191 *      Purpose:
 192 *              Associate the portset's mqueue with the port's mqueue.
 193 *              This has to be done so that posting the port will wakeup
 194 *              a portset waiter.  If there are waiters on the portset
 195 *              mqueue and messages on the port mqueue, try to match them
 196 *              up now.
 197 *      Conditions:
 198 *              May block.
 199 */
 200kern_return_t
 201ipc_mqueue_add(
 202        ipc_mqueue_t     port_mqueue,
 203        ipc_mqueue_t     set_mqueue)
 204{
 205        wait_queue_t     port_waitq = &port_mqueue->imq_wait_queue;
 206        wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
 207        ipc_kmsg_queue_t kmsgq;
 208        ipc_kmsg_t       kmsg, next;
 209        kern_return_t    kr;
 210        spl_t            s;
 211
 212        kr = wait_queue_link(port_waitq, set_waitq);
 213        if (kr != KERN_SUCCESS)
 214                return kr;
 215
 216        /*
 217         * Now that the set has been added to the port, there may be
 218         * messages queued on the port and threads waiting on the set
 219         * waitq.  Lets get them together.
 220         */
 221        s = splsched();
 222        imq_lock(port_mqueue);
 223        kmsgq = &port_mqueue->imq_messages;
 224        for (kmsg = ipc_kmsg_queue_first(kmsgq);
 225             kmsg != IKM_NULL;
 226             kmsg = next) {
 227                next = ipc_kmsg_queue_next(kmsgq, kmsg);
 228
 229                for (;;) {
 230                        thread_t th;
 231
 232                        th = wait_queue_wakeup64_identity_locked(
 233                                                port_waitq,
 234                                                IPC_MQUEUE_RECEIVE,
 235                                                THREAD_AWAKENED,
 236                                                FALSE);
 237                        /* waitq/mqueue still locked, thread locked */
 238
 239                        if (th == THREAD_NULL)
 240                                goto leave;
 241
 242                        /*
 243                         * Found a receiver. see if they can handle the message
 244                         * correctly (the message is not too large for them, or
 245                         * they didn't care to be informed that the message was
 246                         * too large).  If they can't handle it, take them off
 247                         * the list and let them go back and figure it out and
 248                         * just move onto the next.
 249                         */
 250                        if (th->ith_msize <
 251                            kmsg->ikm_header->msgh_size +
 252                            REQUESTED_TRAILER_SIZE(th->ith_option)) {
 253                                th->ith_state = MACH_RCV_TOO_LARGE;
 254                                th->ith_msize = kmsg->ikm_header->msgh_size;
 255                                if (th->ith_option & MACH_RCV_LARGE) {
 256                                        /*
 257                                         * let him go without message
 258                                         */
 259                                        th->ith_kmsg = IKM_NULL;
 260                                        th->ith_seqno = 0;
 261                                        thread_unlock(th);
 262                                        continue; /* find another thread */
 263                                }
 264                        } else {
 265                                th->ith_state = MACH_MSG_SUCCESS;
 266                        }
 267
 268                        /*
 269                         * This thread is going to take this message,
 270                         * so give it to him.
 271                         */
 272                        ipc_kmsg_rmqueue(kmsgq, kmsg);
 273                        ipc_mqueue_release_msgcount(port_mqueue);
 274
 275                        th->ith_kmsg = kmsg;
 276                        th->ith_seqno = port_mqueue->imq_seqno++;
 277                        thread_unlock(th);
 278                        break;  /* go to next message */
 279                }
 280                        
 281        }
 282 leave:
 283        imq_unlock(port_mqueue);
 284        splx(s);
 285        return KERN_SUCCESS;
 286}
 287
 288/*
 289 *      Routine:        ipc_mqueue_changed
 290 *      Purpose:
 291 *              Wake up receivers waiting in a message queue.
 292 *      Conditions:
 293 *              The message queue is locked.
 294 */
 295
 296void
 297ipc_mqueue_changed(
 298        ipc_mqueue_t            mqueue)
 299{
 300        wait_queue_wakeup64_all_locked(
 301                                &mqueue->imq_wait_queue,
 302                                IPC_MQUEUE_RECEIVE,
 303                                THREAD_RESTART,
 304                                FALSE);         /* unlock waitq? */
 305}
 306
 307
 308                
 309
 310/*
 311 *      Routine:        ipc_mqueue_send
 312 *      Purpose:
 313 *              Send a message to a message queue.  The message holds a reference
 314 *              for the destination port for this message queue in the 
 315 *              msgh_remote_port field.
 316 *
 317 *              If unsuccessful, the caller still has possession of
 318 *              the message and must do something with it.  If successful,
 319 *              the message is queued, given to a receiver, or destroyed.
 320 *      Conditions:
 321 *              Nothing locked.
 322 *      Returns:
 323 *              MACH_MSG_SUCCESS        The message was accepted.
 324 *              MACH_SEND_TIMED_OUT     Caller still has message.
 325 *              MACH_SEND_INTERRUPTED   Caller still has message.
 326 */
 327mach_msg_return_t
 328ipc_mqueue_send(
 329        ipc_mqueue_t            mqueue,
 330        ipc_kmsg_t                      kmsg,
 331        mach_msg_option_t       option,
 332        mach_msg_timeout_t      send_timeout)
 333{
 334        int wresult;
 335        spl_t s;
 336
 337        /*
 338         *  Don't block if:
 339         *      1) We're under the queue limit.
 340         *      2) Caller used the MACH_SEND_ALWAYS internal option.
 341         *      3) Message is sent to a send-once right.
 342         */
 343        s = splsched();
 344        imq_lock(mqueue);
 345
 346        if (!imq_full(mqueue) ||
 347                (option & MACH_SEND_ALWAYS) ||
 348                (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
 349                 MACH_MSG_TYPE_PORT_SEND_ONCE)) {
 350                mqueue->imq_msgcount++;
 351                assert(mqueue->imq_msgcount > 0);
 352                imq_unlock(mqueue);
 353                splx(s);
 354        } else {
 355                thread_t cur_thread = current_thread();
 356                uint64_t deadline;
 357
 358                /* 
 359                 * We have to wait for space to be granted to us.
 360                 */
 361                if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
 362                        imq_unlock(mqueue);
 363                        splx(s);
 364                        return MACH_SEND_TIMED_OUT;
 365                }
 366                mqueue->imq_fullwaiters = TRUE;
 367                thread_lock(cur_thread);
 368                if (option & MACH_SEND_TIMEOUT)
 369                        clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
 370                else
 371                        deadline = 0;
 372                wresult = wait_queue_assert_wait64_locked(
 373                                                &mqueue->imq_wait_queue,
 374                                                IPC_MQUEUE_FULL,
 375                                                THREAD_ABORTSAFE, deadline,
 376                                                cur_thread);
 377                thread_unlock(cur_thread);
 378                imq_unlock(mqueue);
 379                splx(s);
 380                
 381                if (wresult == THREAD_WAITING) {
 382                        wresult = thread_block(THREAD_CONTINUE_NULL);
 383                        counter(c_ipc_mqueue_send_block++);
 384                }
 385                
 386                switch (wresult) {
 387                case THREAD_TIMED_OUT:
 388                        assert(option & MACH_SEND_TIMEOUT);
 389                        return MACH_SEND_TIMED_OUT;
 390                        
 391                case THREAD_AWAKENED:
 392                        /* we can proceed - inherited msgcount from waker */
 393                        assert(mqueue->imq_msgcount > 0);
 394                        break;
 395                        
 396                case THREAD_INTERRUPTED:
 397                        return MACH_SEND_INTERRUPTED;
 398                        
 399                case THREAD_RESTART:
 400                default:
 401                        panic("ipc_mqueue_send");
 402                }
 403        }
 404
 405        ipc_mqueue_post(mqueue, kmsg);
 406        return MACH_MSG_SUCCESS;
 407}
 408
 409/*
 410 *      Routine:        ipc_mqueue_release_msgcount
 411 *      Purpose:
 412 *              Release a message queue reference in the case where we
 413 *              found a waiter.
 414 *
 415 *      Conditions:
 416 *              The message queue is locked.
 417 *              The message corresponding to this reference is off the queue.
 418 */
 419void
 420ipc_mqueue_release_msgcount(
 421        ipc_mqueue_t mqueue)    
 422{
 423        assert(imq_held(mqueue));
 424        assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
 425
 426        mqueue->imq_msgcount--;
 427
 428        if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
 429                if (wait_queue_wakeup64_one_locked(
 430                                                &mqueue->imq_wait_queue,
 431                                                IPC_MQUEUE_FULL,
 432                                                THREAD_AWAKENED,
 433                                                FALSE) != KERN_SUCCESS) {
 434                        mqueue->imq_fullwaiters = FALSE;
 435                } else {
 436                        /* gave away our slot - add reference back */
 437                        mqueue->imq_msgcount++; 
 438                }
 439        }
 440}
 441
 442/*
 443 *      Routine:        ipc_mqueue_post
 444 *      Purpose:
 445 *              Post a message to a waiting receiver or enqueue it.  If a
 446 *              receiver is waiting, we can release our reserved space in
 447 *              the message queue.
 448 *
 449 *      Conditions:
 450 *              If we need to queue, our space in the message queue is reserved.
 451 */
 452void
 453ipc_mqueue_post(
 454        register ipc_mqueue_t   mqueue,
 455        register ipc_kmsg_t             kmsg)
 456{
 457
 458        spl_t s;
 459
 460        /*
 461         *      While the msg queue     is locked, we have control of the
 462         *  kmsg, so the ref in it for the port is still good.
 463         *
 464         *      Check for a receiver for the message.
 465         */
 466        s = splsched();
 467        imq_lock(mqueue);
 468        for (;;) {
 469                wait_queue_t waitq = &mqueue->imq_wait_queue;
 470                thread_t receiver;
 471
 472                receiver = wait_queue_wakeup64_identity_locked(
 473                                                        waitq,
 474                                                        IPC_MQUEUE_RECEIVE,
 475                                                        THREAD_AWAKENED,
 476                                                        FALSE);
 477                /* waitq still locked, thread locked */
 478
 479                if (receiver == THREAD_NULL) {
 480                        /* 
 481                         * no receivers; queue kmsg
 482                         */
 483                        assert(mqueue->imq_msgcount > 0);
 484                        ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
 485                        break;
 486                }
 487                
 488                /*
 489                 * We found a waiting thread.
 490                 * If the message is too large or the scatter list is too small
 491                 * the thread we wake up will get that as its status.
 492                 */
 493                if (receiver->ith_msize <
 494                    (kmsg->ikm_header->msgh_size) +
 495                    REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
 496                        receiver->ith_msize = kmsg->ikm_header->msgh_size;
 497                        receiver->ith_state = MACH_RCV_TOO_LARGE;
 498                } else {
 499                        receiver->ith_state = MACH_MSG_SUCCESS;
 500                }
 501
 502                /*
 503                 * If there is no problem with the upcoming receive, or the
 504                 * receiver thread didn't specifically ask for special too
 505                 * large error condition, go ahead and select it anyway.
 506                 */
 507                if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
 508                    !(receiver->ith_option & MACH_RCV_LARGE)) {
 509
 510                        receiver->ith_kmsg = kmsg;
 511                        receiver->ith_seqno = mqueue->imq_seqno++;
 512                        thread_unlock(receiver);
 513
 514                        /* we didn't need our reserved spot in the queue */
 515                        ipc_mqueue_release_msgcount(mqueue);
 516                        break;
 517                }
 518
 519                /*
 520                 * Otherwise, this thread needs to be released to run
 521                 * and handle its error without getting the message.  We
 522                 * need to go back and pick another one.
 523                 */
 524                receiver->ith_kmsg = IKM_NULL;
 525                receiver->ith_seqno = 0;
 526                thread_unlock(receiver);
 527        }
 528
 529        imq_unlock(mqueue);
 530        splx(s);
 531        
 532        current_task()->messages_sent++;
 533        return;
 534}
 535
 536
 537/* static */ void
 538ipc_mqueue_receive_results(wait_result_t saved_wait_result)
 539{
 540        thread_t                self = current_thread();
 541        mach_msg_option_t       option = self->ith_option;
 542        kern_return_t           mr;
 543
 544        /*
 545         * why did we wake up?
 546         */
 547        switch (saved_wait_result) {
 548        case THREAD_TIMED_OUT:
 549                self->ith_state = MACH_RCV_TIMED_OUT;
 550                return;
 551
 552        case THREAD_INTERRUPTED:
 553                self->ith_state = MACH_RCV_INTERRUPTED;
 554                return;
 555
 556        case THREAD_RESTART:
 557                /* something bad happened to the port/set */
 558                self->ith_state = MACH_RCV_PORT_CHANGED;
 559                return;
 560
 561        case THREAD_AWAKENED:
 562                /*
 563                 * We do not need to go select a message, somebody
 564                 * handed us one (or a too-large indication).
 565                 */
 566                mr = MACH_MSG_SUCCESS;
 567
 568                switch (self->ith_state) {
 569                case MACH_RCV_SCATTER_SMALL:
 570                case MACH_RCV_TOO_LARGE:
 571                        /*
 572                         * Somebody tried to give us a too large
 573                         * message. If we indicated that we cared,
 574                         * then they only gave us the indication,
 575                         * otherwise they gave us the indication
 576                         * AND the message anyway.
 577                         */
 578                        if (option & MACH_RCV_LARGE) {
 579                                return;
 580                        }
 581
 582                case MACH_MSG_SUCCESS:
 583                        return;
 584
 585                default:
 586                        panic("ipc_mqueue_receive_results: strange ith_state");
 587                }
 588
 589        default:
 590                panic("ipc_mqueue_receive_results: strange wait_result");
 591        }
 592}
 593
 594void
 595ipc_mqueue_receive_continue(
 596        __unused void *param,
 597        wait_result_t wresult)
 598{
 599        ipc_mqueue_receive_results(wresult);
 600        mach_msg_receive_continue();  /* hard-coded for now */
 601}
 602
 603/*
 604 *      Routine:        ipc_mqueue_receive
 605 *      Purpose:
 606 *              Receive a message from a message queue.
 607 *
 608 *              If continuation is non-zero, then we might discard
 609 *              our kernel stack when we block.  We will continue
 610 *              after unblocking by executing continuation.
 611 *
 612 *              If resume is true, then we are resuming a receive
 613 *              operation after a blocked receive discarded our stack.
 614 *      Conditions:
 615 *              Our caller must hold a reference for the port or port set
 616 *              to which this queue belongs, to keep the queue
 617 *              from being deallocated.
 618 *
 619 *              The kmsg is returned with clean header fields
 620 *              and with the circular bit turned off.
 621 *      Returns:
 622 *              MACH_MSG_SUCCESS        Message returned in kmsgp.
 623 *              MACH_RCV_TOO_LARGE      Message size returned in kmsgp.
 624 *              MACH_RCV_TIMED_OUT      No message obtained.
 625 *              MACH_RCV_INTERRUPTED    No message obtained.
 626 *              MACH_RCV_PORT_DIED      Port/set died; no message.
 627 *              MACH_RCV_PORT_CHANGED   Port moved into set; no msg.
 628 *
 629 */
 630
 631void
 632ipc_mqueue_receive(
 633        ipc_mqueue_t         mqueue,
 634        mach_msg_option_t    option,
 635        mach_msg_size_t      max_size,
 636        mach_msg_timeout_t   rcv_timeout,
 637        int                  interruptible)
 638{
 639        ipc_kmsg_queue_t        kmsgs;
 640        wait_result_t           wresult;
 641        thread_t                self;
 642        uint64_t                                deadline;
 643        spl_t                   s;
 644
 645        s = splsched();
 646        imq_lock(mqueue);
 647        
 648        if (imq_is_set(mqueue)) {
 649                wait_queue_link_t wql;
 650                ipc_mqueue_t port_mq;
 651                queue_t q;
 652
 653                q = &mqueue->imq_setlinks;
 654
 655                /*
 656                 * If we are waiting on a portset mqueue, we need to see if
 657                 * any of the member ports have work for us.  If so, try to
 658                 * deliver one of those messages. By holding the portset's
 659                 * mqueue lock during the search, we tie up any attempts by
 660                 * mqueue_deliver or portset membership changes that may
 661                 * cross our path. But this is a lock order violation, so we
 662                 * have to do it "softly."  If we don't find a message waiting
 663                 * for us, we will assert our intention to wait while still
 664                 * holding that lock.  When we release the lock, the deliver/
 665                 * change will succeed and find us.
 666                 */
 667        search_set:
 668                queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
 669                        port_mq = (ipc_mqueue_t)wql->wql_queue;
 670                        kmsgs = &port_mq->imq_messages;
 671                        
 672                        if (!imq_lock_try(port_mq)) {
 673                                imq_unlock(mqueue);
 674                                splx(s);
 675                                delay(1);
 676                                s = splsched();
 677                                imq_lock(mqueue);
 678                                goto search_set; /* start again at beginning - SMP */
 679                        }
 680
 681                        /*
 682                         * If there is still a message to be had, we will
 683                         * try to select it (may not succeed because of size
 684                         * and options).  In any case, we deliver those
 685                         * results back to the user.
 686                         *
 687                         * We also move the port's linkage to the tail of the
 688                         * list for this set (fairness). Future versions will
 689                         * sort by timestamp or priority.
 690                         */
 691                        if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
 692                                imq_unlock(port_mq);
 693                                continue;
 694                        }
 695                        queue_remove(q, wql, wait_queue_link_t, wql_setlinks);
 696                        queue_enter(q, wql, wait_queue_link_t, wql_setlinks);
 697                        imq_unlock(mqueue);
 698
 699                        ipc_mqueue_select(port_mq, option, max_size);
 700                        imq_unlock(port_mq);
 701                        splx(s);
 702                        return;
 703                        
 704                }
 705
 706        } else {
 707
 708                /*
 709                 * Receive on a single port. Just try to get the messages.
 710                 */
 711                kmsgs = &mqueue->imq_messages;
 712                if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
 713                        ipc_mqueue_select(mqueue, option, max_size);
 714                        imq_unlock(mqueue);
 715                        splx(s);
 716                        return;
 717                }
 718        }
 719                
 720        /*
 721         * Looks like we'll have to block.  The mqueue we will
 722         * block on (whether the set's or the local port's) is
 723         * still locked.
 724         */
 725        self = current_thread();
 726        if (option & MACH_RCV_TIMEOUT) {
 727                if (rcv_timeout == 0) {
 728                        imq_unlock(mqueue);
 729                        splx(s);
 730                        self->ith_state = MACH_RCV_TIMED_OUT;
 731                        return;
 732                }
 733        }
 734
 735        thread_lock(self);
 736        self->ith_state = MACH_RCV_IN_PROGRESS;
 737        self->ith_option = option;
 738        self->ith_msize = max_size;
 739
 740        if (option & MACH_RCV_TIMEOUT)
 741                clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
 742        else
 743                deadline = 0;
 744
 745        wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
 746                                                                                                IPC_MQUEUE_RECEIVE,
 747                                                                                                interruptible, deadline,
 748                                                                                                self);
 749        thread_unlock(self);
 750        imq_unlock(mqueue);
 751        splx(s);
 752
 753        if (wresult == THREAD_WAITING) {
 754                counter((interruptible == THREAD_ABORTSAFE) ? 
 755                        c_ipc_mqueue_receive_block_user++ :
 756                        c_ipc_mqueue_receive_block_kernel++);
 757
 758                if (self->ith_continuation)
 759                        thread_block(ipc_mqueue_receive_continue);
 760                        /* NOTREACHED */
 761
 762                wresult = thread_block(THREAD_CONTINUE_NULL);
 763        }
 764        ipc_mqueue_receive_results(wresult);
 765}
 766
 767
 768/*
 769 *      Routine:        ipc_mqueue_select
 770 *      Purpose:
 771 *              A receiver discovered that there was a message on the queue
 772 *              before he had to block.  Pick the message off the queue and
 773 *              "post" it to himself.
 774 *      Conditions:
 775 *              mqueue locked.
 776 *              There is a message.
 777 *      Returns:
 778 *              MACH_MSG_SUCCESS        Actually selected a message for ourselves.
 779 *              MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
 780 */
 781void
 782ipc_mqueue_select(
 783        ipc_mqueue_t            mqueue,
 784        mach_msg_option_t       option,
 785        mach_msg_size_t         max_size)
 786{
 787        thread_t self = current_thread();
 788        ipc_kmsg_t kmsg;
 789        mach_msg_return_t mr;
 790        mach_msg_size_t rcv_size;
 791
 792        mr = MACH_MSG_SUCCESS;
 793
 794
 795        /*
 796         * Do some sanity checking of our ability to receive
 797         * before pulling the message off the queue.
 798         */
 799        kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
 800        assert(kmsg != IKM_NULL);
 801
 802        /*
 803         * If we really can't receive it, but we had the
 804         * MACH_RCV_LARGE option set, then don't take it off
 805         * the queue, instead return the appropriate error
 806         * (and size needed).
 807         */
 808        rcv_size = ipc_kmsg_copyout_size(kmsg, self->map);
 809        if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
 810                mr = MACH_RCV_TOO_LARGE;
 811                if (option & MACH_RCV_LARGE) {
 812                        self->ith_kmsg = IKM_NULL;
 813                        self->ith_msize = rcv_size;
 814                        self->ith_seqno = 0;
 815                        self->ith_state = mr;
 816                        return;
 817                }
 818        }
 819
 820        ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
 821        ipc_mqueue_release_msgcount(mqueue);
 822        self->ith_seqno = mqueue->imq_seqno++;
 823        self->ith_kmsg = kmsg;
 824        self->ith_state = mr;
 825
 826        current_task()->messages_received++;
 827        return;
 828}
 829
 830/*
 831 *      Routine:        ipc_mqueue_destroy
 832 *      Purpose:
 833 *              Destroy a message queue.  Set any blocked senders running.
 834 *              Destroy the kmsgs in the queue.
 835 *      Conditions:
 836 *              Nothing locked.
 837 *              Receivers were removed when the receive right was "changed"
 838 */
 839void
 840ipc_mqueue_destroy(
 841        ipc_mqueue_t    mqueue) 
 842{
 843        ipc_kmsg_queue_t kmqueue;
 844        ipc_kmsg_t kmsg;
 845        spl_t s;
 846
 847
 848        s = splsched();
 849        imq_lock(mqueue);
 850        /*
 851         *      rouse all blocked senders
 852         */
 853        mqueue->imq_fullwaiters = FALSE;
 854        wait_queue_wakeup64_all_locked(
 855                                &mqueue->imq_wait_queue,
 856                                IPC_MQUEUE_FULL,
 857                                THREAD_AWAKENED,
 858                                FALSE);
 859
 860        kmqueue = &mqueue->imq_messages;
 861
 862        while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
 863                imq_unlock(mqueue);
 864                splx(s);
 865
 866                ipc_kmsg_destroy_dest(kmsg);
 867
 868                s = splsched();
 869                imq_lock(mqueue);
 870        }
 871        imq_unlock(mqueue);
 872        splx(s);
 873}
 874
 875/*
 876 *      Routine:        ipc_mqueue_set_qlimit
 877 *      Purpose:
 878 *              Changes a message queue limit; the maximum number
 879 *              of messages which may be queued.
 880 *      Conditions:
 881 *              Nothing locked.
 882 */
 883
 884void
 885ipc_mqueue_set_qlimit(
 886         ipc_mqueue_t                   mqueue,
 887         mach_port_msgcount_t   qlimit)
 888{
 889         spl_t s;
 890
 891         assert(qlimit <= MACH_PORT_QLIMIT_MAX);
 892
 893         /* wake up senders allowed by the new qlimit */
 894         s = splsched();
 895         imq_lock(mqueue);
 896         if (qlimit > mqueue->imq_qlimit) {
 897                 mach_port_msgcount_t i, wakeup;
 898
 899                 /* caution: wakeup, qlimit are unsigned */
 900                 wakeup = qlimit - mqueue->imq_qlimit;
 901
 902                 for (i = 0; i < wakeup; i++) {
 903                         if (wait_queue_wakeup64_one_locked(
 904                                                        &mqueue->imq_wait_queue,
 905                                                        IPC_MQUEUE_FULL,
 906                                                        THREAD_AWAKENED,
 907                                                        FALSE) == KERN_NOT_WAITING) {
 908                                         mqueue->imq_fullwaiters = FALSE;
 909                                         break;
 910                         }
 911                         mqueue->imq_msgcount++;  /* give it to the awakened thread */
 912                 }
 913         }
 914        mqueue->imq_qlimit = qlimit;
 915        imq_unlock(mqueue);
 916        splx(s);
 917}
 918
 919/*
 920 *      Routine:        ipc_mqueue_set_seqno
 921 *      Purpose:
 922 *              Changes an mqueue's sequence number.
 923 *      Conditions:
 924 *              Caller holds a reference to the queue's containing object.
 925 */
 926void
 927ipc_mqueue_set_seqno(
 928        ipc_mqueue_t            mqueue,
 929        mach_port_seqno_t       seqno)
 930{
 931        spl_t s;
 932
 933        s = splsched();
 934        imq_lock(mqueue);
 935        mqueue->imq_seqno = seqno;
 936        imq_unlock(mqueue);
 937        splx(s);
 938}
 939
 940
 941/*
 942 *      Routine:        ipc_mqueue_copyin
 943 *      Purpose:
 944 *              Convert a name in a space to a message queue.
 945 *      Conditions:
 946 *              Nothing locked.  If successful, the caller gets a ref for
 947 *              for the object. This ref ensures the continued existence of
 948 *              the queue.
 949 *      Returns:
 950 *              MACH_MSG_SUCCESS        Found a message queue.
 951 *              MACH_RCV_INVALID_NAME   The space is dead.
 952 *              MACH_RCV_INVALID_NAME   The name doesn't denote a right.
 953 *              MACH_RCV_INVALID_NAME
 954 *                      The denoted right is not receive or port set.
 955 *              MACH_RCV_IN_SET         Receive right is a member of a set.
 956 */
 957
 958mach_msg_return_t
 959ipc_mqueue_copyin(
 960        ipc_space_t             space,
 961        mach_port_name_t        name,
 962        ipc_mqueue_t            *mqueuep,
 963        ipc_object_t            *objectp)
 964{
 965        ipc_entry_t entry;
 966        ipc_object_t object;
 967        ipc_mqueue_t mqueue;
 968
 969        is_read_lock(space);
 970        if (!space->is_active) {
 971                is_read_unlock(space);
 972                return MACH_RCV_INVALID_NAME;
 973        }
 974
 975        entry = ipc_entry_lookup(space, name);
 976        if (entry == IE_NULL) {
 977                is_read_unlock(space);
 978                return MACH_RCV_INVALID_NAME;
 979        }
 980
 981        object = entry->ie_object;
 982
 983        if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
 984                ipc_port_t port;
 985
 986                port = (ipc_port_t) object;
 987                assert(port != IP_NULL);
 988
 989                ip_lock(port);
 990                assert(ip_active(port));
 991                assert(port->ip_receiver_name == name);
 992                assert(port->ip_receiver == space);
 993                is_read_unlock(space);
 994                mqueue = &port->ip_messages;
 995
 996        } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
 997                ipc_pset_t pset;
 998
 999                pset = (ipc_pset_t) object;
1000                assert(pset != IPS_NULL);
1001
1002                ips_lock(pset);
1003                assert(ips_active(pset));
1004                assert(pset->ips_local_name == name);
1005                is_read_unlock(space);
1006
1007                mqueue = &pset->ips_messages;
1008        } else {
1009                is_read_unlock(space);
1010                return MACH_RCV_INVALID_NAME;
1011        }
1012
1013        /*
1014         *      At this point, the object is locked and active,
1015         *      the space is unlocked, and mqueue is initialized.
1016         */
1017
1018        io_reference(object);
1019        io_unlock(object);
1020
1021        *objectp = object;
1022        *mqueuep = mqueue;
1023        return MACH_MSG_SUCCESS;
1024}
1025
1026
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.