1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60#include <mach/port.h>
61#include <mach/message.h>
62#include <mach/sync_policy.h>
63
64#include <kern/assert.h>
65#include <kern/counters.h>
66#include <kern/sched_prim.h>
67#include <kern/ipc_kobject.h>
68#include <kern/ipc_mig.h>
69#include <kern/misc_protos.h>
70#include <kern/task.h>
71#include <kern/thread.h>
72#include <kern/wait_queue.h>
73
74#include <ipc/ipc_mqueue.h>
75#include <ipc/ipc_kmsg.h>
76#include <ipc/ipc_port.h>
77#include <ipc/ipc_pset.h>
78#include <ipc/ipc_space.h>
79
80#include <ddb/tr.h>
81
82int ipc_mqueue_full;
83int ipc_mqueue_rcv;
84
85#define TR_ENABLE 0
86
87
88void ipc_mqueue_receive_results(wait_result_t result);
89
90
91
92
93
94
95void
96ipc_mqueue_init(
97 ipc_mqueue_t mqueue,
98 boolean_t is_set)
99{
100 if (is_set) {
101 wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
102 } else {
103 wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
104 ipc_kmsg_queue_init(&mqueue->imq_messages);
105 mqueue->imq_seqno = 0;
106 mqueue->imq_msgcount = 0;
107 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
108 mqueue->imq_fullwaiters = FALSE;
109 }
110}
111
112
113
114
115
116
117
118
119
120
121
122
123
124boolean_t
125ipc_mqueue_member(
126 ipc_mqueue_t port_mqueue,
127 ipc_mqueue_t set_mqueue)
128{
129 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
130 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
131
132 return (wait_queue_member(port_waitq, set_waitq));
133
134}
135
136
137
138
139
140
141
142
143kern_return_t
144ipc_mqueue_remove(
145 ipc_mqueue_t mqueue,
146 ipc_mqueue_t set_mqueue)
147{
148 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
149 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
150
151 return wait_queue_unlink(mq_waitq, set_waitq);
152}
153
154
155
156
157
158
159
160
161void
162ipc_mqueue_remove_from_all(
163 ipc_mqueue_t mqueue)
164{
165 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
166
167 wait_queue_unlink_all(mq_waitq);
168 return;
169}
170
171
172
173
174
175
176
177
178void
179ipc_mqueue_remove_all(
180 ipc_mqueue_t mqueue)
181{
182 wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
183
184 wait_queue_set_unlink_all(mq_setq);
185 return;
186}
187
188
189
190
191
192
193
194
195
196
197
198
199
200kern_return_t
201ipc_mqueue_add(
202 ipc_mqueue_t port_mqueue,
203 ipc_mqueue_t set_mqueue)
204{
205 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
206 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
207 ipc_kmsg_queue_t kmsgq;
208 ipc_kmsg_t kmsg, next;
209 kern_return_t kr;
210 spl_t s;
211
212 kr = wait_queue_link(port_waitq, set_waitq);
213 if (kr != KERN_SUCCESS)
214 return kr;
215
216
217
218
219
220
221 s = splsched();
222 imq_lock(port_mqueue);
223 kmsgq = &port_mqueue->imq_messages;
224 for (kmsg = ipc_kmsg_queue_first(kmsgq);
225 kmsg != IKM_NULL;
226 kmsg = next) {
227 next = ipc_kmsg_queue_next(kmsgq, kmsg);
228
229 for (;;) {
230 thread_t th;
231
232 th = wait_queue_wakeup64_identity_locked(
233 port_waitq,
234 IPC_MQUEUE_RECEIVE,
235 THREAD_AWAKENED,
236 FALSE);
237
238
239 if (th == THREAD_NULL)
240 goto leave;
241
242
243
244
245
246
247
248
249
250 if (th->ith_msize <
251 kmsg->ikm_header->msgh_size +
252 REQUESTED_TRAILER_SIZE(th->ith_option)) {
253 th->ith_state = MACH_RCV_TOO_LARGE;
254 th->ith_msize = kmsg->ikm_header->msgh_size;
255 if (th->ith_option & MACH_RCV_LARGE) {
256
257
258
259 th->ith_kmsg = IKM_NULL;
260 th->ith_seqno = 0;
261 thread_unlock(th);
262 continue;
263 }
264 } else {
265 th->ith_state = MACH_MSG_SUCCESS;
266 }
267
268
269
270
271
272 ipc_kmsg_rmqueue(kmsgq, kmsg);
273 ipc_mqueue_release_msgcount(port_mqueue);
274
275 th->ith_kmsg = kmsg;
276 th->ith_seqno = port_mqueue->imq_seqno++;
277 thread_unlock(th);
278 break;
279 }
280
281 }
282 leave:
283 imq_unlock(port_mqueue);
284 splx(s);
285 return KERN_SUCCESS;
286}
287
288
289
290
291
292
293
294
295
296void
297ipc_mqueue_changed(
298 ipc_mqueue_t mqueue)
299{
300 wait_queue_wakeup64_all_locked(
301 &mqueue->imq_wait_queue,
302 IPC_MQUEUE_RECEIVE,
303 THREAD_RESTART,
304 FALSE);
305}
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327mach_msg_return_t
328ipc_mqueue_send(
329 ipc_mqueue_t mqueue,
330 ipc_kmsg_t kmsg,
331 mach_msg_option_t option,
332 mach_msg_timeout_t send_timeout)
333{
334 int wresult;
335 spl_t s;
336
337
338
339
340
341
342
343 s = splsched();
344 imq_lock(mqueue);
345
346 if (!imq_full(mqueue) ||
347 (option & MACH_SEND_ALWAYS) ||
348 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
349 MACH_MSG_TYPE_PORT_SEND_ONCE)) {
350 mqueue->imq_msgcount++;
351 assert(mqueue->imq_msgcount > 0);
352 imq_unlock(mqueue);
353 splx(s);
354 } else {
355 thread_t cur_thread = current_thread();
356 uint64_t deadline;
357
358
359
360
361 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
362 imq_unlock(mqueue);
363 splx(s);
364 return MACH_SEND_TIMED_OUT;
365 }
366 mqueue->imq_fullwaiters = TRUE;
367 thread_lock(cur_thread);
368 if (option & MACH_SEND_TIMEOUT)
369 clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
370 else
371 deadline = 0;
372 wresult = wait_queue_assert_wait64_locked(
373 &mqueue->imq_wait_queue,
374 IPC_MQUEUE_FULL,
375 THREAD_ABORTSAFE, deadline,
376 cur_thread);
377 thread_unlock(cur_thread);
378 imq_unlock(mqueue);
379 splx(s);
380
381 if (wresult == THREAD_WAITING) {
382 wresult = thread_block(THREAD_CONTINUE_NULL);
383 counter(c_ipc_mqueue_send_block++);
384 }
385
386 switch (wresult) {
387 case THREAD_TIMED_OUT:
388 assert(option & MACH_SEND_TIMEOUT);
389 return MACH_SEND_TIMED_OUT;
390
391 case THREAD_AWAKENED:
392
393 assert(mqueue->imq_msgcount > 0);
394 break;
395
396 case THREAD_INTERRUPTED:
397 return MACH_SEND_INTERRUPTED;
398
399 case THREAD_RESTART:
400 default:
401 panic("ipc_mqueue_send");
402 }
403 }
404
405 ipc_mqueue_post(mqueue, kmsg);
406 return MACH_MSG_SUCCESS;
407}
408
409
410
411
412
413
414
415
416
417
418
419void
420ipc_mqueue_release_msgcount(
421 ipc_mqueue_t mqueue)
422{
423 assert(imq_held(mqueue));
424 assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
425
426 mqueue->imq_msgcount--;
427
428 if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
429 if (wait_queue_wakeup64_one_locked(
430 &mqueue->imq_wait_queue,
431 IPC_MQUEUE_FULL,
432 THREAD_AWAKENED,
433 FALSE) != KERN_SUCCESS) {
434 mqueue->imq_fullwaiters = FALSE;
435 } else {
436
437 mqueue->imq_msgcount++;
438 }
439 }
440}
441
442
443
444
445
446
447
448
449
450
451
452void
453ipc_mqueue_post(
454 register ipc_mqueue_t mqueue,
455 register ipc_kmsg_t kmsg)
456{
457
458 spl_t s;
459
460
461
462
463
464
465
466 s = splsched();
467 imq_lock(mqueue);
468 for (;;) {
469 wait_queue_t waitq = &mqueue->imq_wait_queue;
470 thread_t receiver;
471
472 receiver = wait_queue_wakeup64_identity_locked(
473 waitq,
474 IPC_MQUEUE_RECEIVE,
475 THREAD_AWAKENED,
476 FALSE);
477
478
479 if (receiver == THREAD_NULL) {
480
481
482
483 assert(mqueue->imq_msgcount > 0);
484 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
485 break;
486 }
487
488
489
490
491
492
493 if (receiver->ith_msize <
494 (kmsg->ikm_header->msgh_size) +
495 REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
496 receiver->ith_msize = kmsg->ikm_header->msgh_size;
497 receiver->ith_state = MACH_RCV_TOO_LARGE;
498 } else {
499 receiver->ith_state = MACH_MSG_SUCCESS;
500 }
501
502
503
504
505
506
507 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
508 !(receiver->ith_option & MACH_RCV_LARGE)) {
509
510 receiver->ith_kmsg = kmsg;
511 receiver->ith_seqno = mqueue->imq_seqno++;
512 thread_unlock(receiver);
513
514
515 ipc_mqueue_release_msgcount(mqueue);
516 break;
517 }
518
519
520
521
522
523
524 receiver->ith_kmsg = IKM_NULL;
525 receiver->ith_seqno = 0;
526 thread_unlock(receiver);
527 }
528
529 imq_unlock(mqueue);
530 splx(s);
531
532 current_task()->messages_sent++;
533 return;
534}
535
536
537 void
538ipc_mqueue_receive_results(wait_result_t saved_wait_result)
539{
540 thread_t self = current_thread();
541 mach_msg_option_t option = self->ith_option;
542 kern_return_t mr;
543
544
545
546
547 switch (saved_wait_result) {
548 case THREAD_TIMED_OUT:
549 self->ith_state = MACH_RCV_TIMED_OUT;
550 return;
551
552 case THREAD_INTERRUPTED:
553 self->ith_state = MACH_RCV_INTERRUPTED;
554 return;
555
556 case THREAD_RESTART:
557
558 self->ith_state = MACH_RCV_PORT_CHANGED;
559 return;
560
561 case THREAD_AWAKENED:
562
563
564
565
566 mr = MACH_MSG_SUCCESS;
567
568 switch (self->ith_state) {
569 case MACH_RCV_SCATTER_SMALL:
570 case MACH_RCV_TOO_LARGE:
571
572
573
574
575
576
577
578 if (option & MACH_RCV_LARGE) {
579 return;
580 }
581
582 case MACH_MSG_SUCCESS:
583 return;
584
585 default:
586 panic("ipc_mqueue_receive_results: strange ith_state");
587 }
588
589 default:
590 panic("ipc_mqueue_receive_results: strange wait_result");
591 }
592}
593
594void
595ipc_mqueue_receive_continue(
596 __unused void *param,
597 wait_result_t wresult)
598{
599 ipc_mqueue_receive_results(wresult);
600 mach_msg_receive_continue();
601}
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631void
632ipc_mqueue_receive(
633 ipc_mqueue_t mqueue,
634 mach_msg_option_t option,
635 mach_msg_size_t max_size,
636 mach_msg_timeout_t rcv_timeout,
637 int interruptible)
638{
639 ipc_kmsg_queue_t kmsgs;
640 wait_result_t wresult;
641 thread_t self;
642 uint64_t deadline;
643 spl_t s;
644
645 s = splsched();
646 imq_lock(mqueue);
647
648 if (imq_is_set(mqueue)) {
649 wait_queue_link_t wql;
650 ipc_mqueue_t port_mq;
651 queue_t q;
652
653 q = &mqueue->imq_setlinks;
654
655
656
657
658
659
660
661
662
663
664
665
666
667 search_set:
668 queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
669 port_mq = (ipc_mqueue_t)wql->wql_queue;
670 kmsgs = &port_mq->imq_messages;
671
672 if (!imq_lock_try(port_mq)) {
673 imq_unlock(mqueue);
674 splx(s);
675 delay(1);
676 s = splsched();
677 imq_lock(mqueue);
678 goto search_set;
679 }
680
681
682
683
684
685
686
687
688
689
690
691 if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
692 imq_unlock(port_mq);
693 continue;
694 }
695 queue_remove(q, wql, wait_queue_link_t, wql_setlinks);
696 queue_enter(q, wql, wait_queue_link_t, wql_setlinks);
697 imq_unlock(mqueue);
698
699 ipc_mqueue_select(port_mq, option, max_size);
700 imq_unlock(port_mq);
701 splx(s);
702 return;
703
704 }
705
706 } else {
707
708
709
710
711 kmsgs = &mqueue->imq_messages;
712 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
713 ipc_mqueue_select(mqueue, option, max_size);
714 imq_unlock(mqueue);
715 splx(s);
716 return;
717 }
718 }
719
720
721
722
723
724
725 self = current_thread();
726 if (option & MACH_RCV_TIMEOUT) {
727 if (rcv_timeout == 0) {
728 imq_unlock(mqueue);
729 splx(s);
730 self->ith_state = MACH_RCV_TIMED_OUT;
731 return;
732 }
733 }
734
735 thread_lock(self);
736 self->ith_state = MACH_RCV_IN_PROGRESS;
737 self->ith_option = option;
738 self->ith_msize = max_size;
739
740 if (option & MACH_RCV_TIMEOUT)
741 clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
742 else
743 deadline = 0;
744
745 wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
746 IPC_MQUEUE_RECEIVE,
747 interruptible, deadline,
748 self);
749 thread_unlock(self);
750 imq_unlock(mqueue);
751 splx(s);
752
753 if (wresult == THREAD_WAITING) {
754 counter((interruptible == THREAD_ABORTSAFE) ?
755 c_ipc_mqueue_receive_block_user++ :
756 c_ipc_mqueue_receive_block_kernel++);
757
758 if (self->ith_continuation)
759 thread_block(ipc_mqueue_receive_continue);
760
761
762 wresult = thread_block(THREAD_CONTINUE_NULL);
763 }
764 ipc_mqueue_receive_results(wresult);
765}
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781void
782ipc_mqueue_select(
783 ipc_mqueue_t mqueue,
784 mach_msg_option_t option,
785 mach_msg_size_t max_size)
786{
787 thread_t self = current_thread();
788 ipc_kmsg_t kmsg;
789 mach_msg_return_t mr;
790 mach_msg_size_t rcv_size;
791
792 mr = MACH_MSG_SUCCESS;
793
794
795
796
797
798
799 kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
800 assert(kmsg != IKM_NULL);
801
802
803
804
805
806
807
808 rcv_size = ipc_kmsg_copyout_size(kmsg, self->map);
809 if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
810 mr = MACH_RCV_TOO_LARGE;
811 if (option & MACH_RCV_LARGE) {
812 self->ith_kmsg = IKM_NULL;
813 self->ith_msize = rcv_size;
814 self->ith_seqno = 0;
815 self->ith_state = mr;
816 return;
817 }
818 }
819
820 ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
821 ipc_mqueue_release_msgcount(mqueue);
822 self->ith_seqno = mqueue->imq_seqno++;
823 self->ith_kmsg = kmsg;
824 self->ith_state = mr;
825
826 current_task()->messages_received++;
827 return;
828}
829
830
831
832
833
834
835
836
837
838
839void
840ipc_mqueue_destroy(
841 ipc_mqueue_t mqueue)
842{
843 ipc_kmsg_queue_t kmqueue;
844 ipc_kmsg_t kmsg;
845 spl_t s;
846
847
848 s = splsched();
849 imq_lock(mqueue);
850
851
852
853 mqueue->imq_fullwaiters = FALSE;
854 wait_queue_wakeup64_all_locked(
855 &mqueue->imq_wait_queue,
856 IPC_MQUEUE_FULL,
857 THREAD_AWAKENED,
858 FALSE);
859
860 kmqueue = &mqueue->imq_messages;
861
862 while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
863 imq_unlock(mqueue);
864 splx(s);
865
866 ipc_kmsg_destroy_dest(kmsg);
867
868 s = splsched();
869 imq_lock(mqueue);
870 }
871 imq_unlock(mqueue);
872 splx(s);
873}
874
875
876
877
878
879
880
881
882
883
884void
885ipc_mqueue_set_qlimit(
886 ipc_mqueue_t mqueue,
887 mach_port_msgcount_t qlimit)
888{
889 spl_t s;
890
891 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
892
893
894 s = splsched();
895 imq_lock(mqueue);
896 if (qlimit > mqueue->imq_qlimit) {
897 mach_port_msgcount_t i, wakeup;
898
899
900 wakeup = qlimit - mqueue->imq_qlimit;
901
902 for (i = 0; i < wakeup; i++) {
903 if (wait_queue_wakeup64_one_locked(
904 &mqueue->imq_wait_queue,
905 IPC_MQUEUE_FULL,
906 THREAD_AWAKENED,
907 FALSE) == KERN_NOT_WAITING) {
908 mqueue->imq_fullwaiters = FALSE;
909 break;
910 }
911 mqueue->imq_msgcount++;
912 }
913 }
914 mqueue->imq_qlimit = qlimit;
915 imq_unlock(mqueue);
916 splx(s);
917}
918
919
920
921
922
923
924
925
926void
927ipc_mqueue_set_seqno(
928 ipc_mqueue_t mqueue,
929 mach_port_seqno_t seqno)
930{
931 spl_t s;
932
933 s = splsched();
934 imq_lock(mqueue);
935 mqueue->imq_seqno = seqno;
936 imq_unlock(mqueue);
937 splx(s);
938}
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958mach_msg_return_t
959ipc_mqueue_copyin(
960 ipc_space_t space,
961 mach_port_name_t name,
962 ipc_mqueue_t *mqueuep,
963 ipc_object_t *objectp)
964{
965 ipc_entry_t entry;
966 ipc_object_t object;
967 ipc_mqueue_t mqueue;
968
969 is_read_lock(space);
970 if (!space->is_active) {
971 is_read_unlock(space);
972 return MACH_RCV_INVALID_NAME;
973 }
974
975 entry = ipc_entry_lookup(space, name);
976 if (entry == IE_NULL) {
977 is_read_unlock(space);
978 return MACH_RCV_INVALID_NAME;
979 }
980
981 object = entry->ie_object;
982
983 if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
984 ipc_port_t port;
985
986 port = (ipc_port_t) object;
987 assert(port != IP_NULL);
988
989 ip_lock(port);
990 assert(ip_active(port));
991 assert(port->ip_receiver_name == name);
992 assert(port->ip_receiver == space);
993 is_read_unlock(space);
994 mqueue = &port->ip_messages;
995
996 } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
997 ipc_pset_t pset;
998
999 pset = (ipc_pset_t) object;
1000 assert(pset != IPS_NULL);
1001
1002 ips_lock(pset);
1003 assert(ips_active(pset));
1004 assert(pset->ips_local_name == name);
1005 is_read_unlock(space);
1006
1007 mqueue = &pset->ips_messages;
1008 } else {
1009 is_read_unlock(space);
1010 return MACH_RCV_INVALID_NAME;
1011 }
1012
1013
1014
1015
1016
1017
1018 io_reference(object);
1019 io_unlock(object);
1020
1021 *objectp = object;
1022 *mqueuep = mqueue;
1023 return MACH_MSG_SUCCESS;
1024}
1025
1026