linux/kernel/watch_queue.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/* Watch queue and general notification mechanism, built on pipes
   3 *
   4 * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
   5 * Written by David Howells (dhowells@redhat.com)
   6 *
   7 * See Documentation/core-api/watch_queue.rst
   8 */
   9
  10#define pr_fmt(fmt) "watchq: " fmt
  11#include <linux/module.h>
  12#include <linux/init.h>
  13#include <linux/sched.h>
  14#include <linux/slab.h>
  15#include <linux/printk.h>
  16#include <linux/miscdevice.h>
  17#include <linux/fs.h>
  18#include <linux/mm.h>
  19#include <linux/pagemap.h>
  20#include <linux/poll.h>
  21#include <linux/uaccess.h>
  22#include <linux/vmalloc.h>
  23#include <linux/file.h>
  24#include <linux/security.h>
  25#include <linux/cred.h>
  26#include <linux/sched/signal.h>
  27#include <linux/watch_queue.h>
  28#include <linux/pipe_fs_i.h>
  29
  30MODULE_DESCRIPTION("Watch queue");
  31MODULE_AUTHOR("Red Hat, Inc.");
  32
  33#define WATCH_QUEUE_NOTE_SIZE 128
  34#define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)
  35
  36/*
  37 * This must be called under the RCU read-lock, which makes
  38 * sure that the wqueue still exists. It can then take the lock,
  39 * and check that the wqueue hasn't been destroyed, which in
  40 * turn makes sure that the notification pipe still exists.
  41 */
  42static inline bool lock_wqueue(struct watch_queue *wqueue)
  43{
  44        spin_lock_bh(&wqueue->lock);
  45        if (unlikely(!wqueue->pipe)) {
  46                spin_unlock_bh(&wqueue->lock);
  47                return false;
  48        }
  49        return true;
  50}
  51
  52static inline void unlock_wqueue(struct watch_queue *wqueue)
  53{
  54        spin_unlock_bh(&wqueue->lock);
  55}
  56
  57static void watch_queue_pipe_buf_release(struct pipe_inode_info *pipe,
  58                                         struct pipe_buffer *buf)
  59{
  60        struct watch_queue *wqueue = (struct watch_queue *)buf->private;
  61        struct page *page;
  62        unsigned int bit;
  63
  64        /* We need to work out which note within the page this refers to, but
  65         * the note might have been maximum size, so merely ANDing the offset
  66         * off doesn't work.  OTOH, the note must've been more than zero size.
  67         */
  68        bit = buf->offset + buf->len;
  69        if ((bit & (WATCH_QUEUE_NOTE_SIZE - 1)) == 0)
  70                bit -= WATCH_QUEUE_NOTE_SIZE;
  71        bit /= WATCH_QUEUE_NOTE_SIZE;
  72
  73        page = buf->page;
  74        bit += page->index;
  75
  76        set_bit(bit, wqueue->notes_bitmap);
  77        generic_pipe_buf_release(pipe, buf);
  78}
  79
  80// No try_steal function => no stealing
  81#define watch_queue_pipe_buf_try_steal NULL
  82
  83/* New data written to a pipe may be appended to a buffer with this type. */
  84static const struct pipe_buf_operations watch_queue_pipe_buf_ops = {
  85        .release        = watch_queue_pipe_buf_release,
  86        .try_steal      = watch_queue_pipe_buf_try_steal,
  87        .get            = generic_pipe_buf_get,
  88};
  89
  90/*
  91 * Post a notification to a watch queue.
  92 *
  93 * Must be called with the RCU lock for reading, and the
  94 * watch_queue lock held, which guarantees that the pipe
  95 * hasn't been released.
  96 */
  97static bool post_one_notification(struct watch_queue *wqueue,
  98                                  struct watch_notification *n)
  99{
 100        void *p;
 101        struct pipe_inode_info *pipe = wqueue->pipe;
 102        struct pipe_buffer *buf;
 103        struct page *page;
 104        unsigned int head, tail, mask, note, offset, len;
 105        bool done = false;
 106
 107        spin_lock_irq(&pipe->rd_wait.lock);
 108
 109        mask = pipe->ring_size - 1;
 110        head = pipe->head;
 111        tail = pipe->tail;
 112        if (pipe_full(head, tail, pipe->ring_size))
 113                goto lost;
 114
 115        note = find_first_bit(wqueue->notes_bitmap, wqueue->nr_notes);
 116        if (note >= wqueue->nr_notes)
 117                goto lost;
 118
 119        page = wqueue->notes[note / WATCH_QUEUE_NOTES_PER_PAGE];
 120        offset = note % WATCH_QUEUE_NOTES_PER_PAGE * WATCH_QUEUE_NOTE_SIZE;
 121        get_page(page);
 122        len = n->info & WATCH_INFO_LENGTH;
 123        p = kmap_atomic(page);
 124        memcpy(p + offset, n, len);
 125        kunmap_atomic(p);
 126
 127        buf = &pipe->bufs[head & mask];
 128        buf->page = page;
 129        buf->private = (unsigned long)wqueue;
 130        buf->ops = &watch_queue_pipe_buf_ops;
 131        buf->offset = offset;
 132        buf->len = len;
 133        buf->flags = PIPE_BUF_FLAG_WHOLE;
 134        smp_store_release(&pipe->head, head + 1); /* vs pipe_read() */
 135
 136        if (!test_and_clear_bit(note, wqueue->notes_bitmap)) {
 137                spin_unlock_irq(&pipe->rd_wait.lock);
 138                BUG();
 139        }
 140        wake_up_interruptible_sync_poll_locked(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
 141        done = true;
 142
 143out:
 144        spin_unlock_irq(&pipe->rd_wait.lock);
 145        if (done)
 146                kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 147        return done;
 148
 149lost:
 150        buf = &pipe->bufs[(head - 1) & mask];
 151        buf->flags |= PIPE_BUF_FLAG_LOSS;
 152        goto out;
 153}
 154
 155/*
 156 * Apply filter rules to a notification.
 157 */
 158static bool filter_watch_notification(const struct watch_filter *wf,
 159                                      const struct watch_notification *n)
 160{
 161        const struct watch_type_filter *wt;
 162        unsigned int st_bits = sizeof(wt->subtype_filter[0]) * 8;
 163        unsigned int st_index = n->subtype / st_bits;
 164        unsigned int st_bit = 1U << (n->subtype % st_bits);
 165        int i;
 166
 167        if (!test_bit(n->type, wf->type_filter))
 168                return false;
 169
 170        for (i = 0; i < wf->nr_filters; i++) {
 171                wt = &wf->filters[i];
 172                if (n->type == wt->type &&
 173                    (wt->subtype_filter[st_index] & st_bit) &&
 174                    (n->info & wt->info_mask) == wt->info_filter)
 175                        return true;
 176        }
 177
 178        return false; /* If there is a filter, the default is to reject. */
 179}
 180
 181/**
 182 * __post_watch_notification - Post an event notification
 183 * @wlist: The watch list to post the event to.
 184 * @n: The notification record to post.
 185 * @cred: The creds of the process that triggered the notification.
 186 * @id: The ID to match on the watch.
 187 *
 188 * Post a notification of an event into a set of watch queues and let the users
 189 * know.
 190 *
 191 * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and
 192 * should be in units of sizeof(*n).
 193 */
 194void __post_watch_notification(struct watch_list *wlist,
 195                               struct watch_notification *n,
 196                               const struct cred *cred,
 197                               u64 id)
 198{
 199        const struct watch_filter *wf;
 200        struct watch_queue *wqueue;
 201        struct watch *watch;
 202
 203        if (((n->info & WATCH_INFO_LENGTH) >> WATCH_INFO_LENGTH__SHIFT) == 0) {
 204                WARN_ON(1);
 205                return;
 206        }
 207
 208        rcu_read_lock();
 209
 210        hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) {
 211                if (watch->id != id)
 212                        continue;
 213                n->info &= ~WATCH_INFO_ID;
 214                n->info |= watch->info_id;
 215
 216                wqueue = rcu_dereference(watch->queue);
 217                wf = rcu_dereference(wqueue->filter);
 218                if (wf && !filter_watch_notification(wf, n))
 219                        continue;
 220
 221                if (security_post_notification(watch->cred, cred, n) < 0)
 222                        continue;
 223
 224                if (lock_wqueue(wqueue)) {
 225                        post_one_notification(wqueue, n);
 226                        unlock_wqueue(wqueue);
 227                }
 228        }
 229
 230        rcu_read_unlock();
 231}
 232EXPORT_SYMBOL(__post_watch_notification);
 233
 234/*
 235 * Allocate sufficient pages to preallocation for the requested number of
 236 * notifications.
 237 */
 238long watch_queue_set_size(struct pipe_inode_info *pipe, unsigned int nr_notes)
 239{
 240        struct watch_queue *wqueue = pipe->watch_queue;
 241        struct page **pages;
 242        unsigned long *bitmap;
 243        unsigned long user_bufs;
 244        int ret, i, nr_pages;
 245
 246        if (!wqueue)
 247                return -ENODEV;
 248        if (wqueue->notes)
 249                return -EBUSY;
 250
 251        if (nr_notes < 1 ||
 252            nr_notes > 512) /* TODO: choose a better hard limit */
 253                return -EINVAL;
 254
 255        nr_pages = (nr_notes + WATCH_QUEUE_NOTES_PER_PAGE - 1);
 256        nr_pages /= WATCH_QUEUE_NOTES_PER_PAGE;
 257        user_bufs = account_pipe_buffers(pipe->user, pipe->nr_accounted, nr_pages);
 258
 259        if (nr_pages > pipe->max_usage &&
 260            (too_many_pipe_buffers_hard(user_bufs) ||
 261             too_many_pipe_buffers_soft(user_bufs)) &&
 262            pipe_is_unprivileged_user()) {
 263                ret = -EPERM;
 264                goto error;
 265        }
 266
 267        nr_notes = nr_pages * WATCH_QUEUE_NOTES_PER_PAGE;
 268        ret = pipe_resize_ring(pipe, roundup_pow_of_two(nr_notes));
 269        if (ret < 0)
 270                goto error;
 271
 272        ret = -ENOMEM;
 273        pages = kcalloc(sizeof(struct page *), nr_pages, GFP_KERNEL);
 274        if (!pages)
 275                goto error;
 276
 277        for (i = 0; i < nr_pages; i++) {
 278                pages[i] = alloc_page(GFP_KERNEL);
 279                if (!pages[i])
 280                        goto error_p;
 281                pages[i]->index = i * WATCH_QUEUE_NOTES_PER_PAGE;
 282        }
 283
 284        bitmap = bitmap_alloc(nr_notes, GFP_KERNEL);
 285        if (!bitmap)
 286                goto error_p;
 287
 288        bitmap_fill(bitmap, nr_notes);
 289        wqueue->notes = pages;
 290        wqueue->notes_bitmap = bitmap;
 291        wqueue->nr_pages = nr_pages;
 292        wqueue->nr_notes = nr_notes;
 293        return 0;
 294
 295error_p:
 296        while (--i >= 0)
 297                __free_page(pages[i]);
 298        kfree(pages);
 299error:
 300        (void) account_pipe_buffers(pipe->user, nr_pages, pipe->nr_accounted);
 301        return ret;
 302}
 303
 304/*
 305 * Set the filter on a watch queue.
 306 */
 307long watch_queue_set_filter(struct pipe_inode_info *pipe,
 308                            struct watch_notification_filter __user *_filter)
 309{
 310        struct watch_notification_type_filter *tf;
 311        struct watch_notification_filter filter;
 312        struct watch_type_filter *q;
 313        struct watch_filter *wfilter;
 314        struct watch_queue *wqueue = pipe->watch_queue;
 315        int ret, nr_filter = 0, i;
 316
 317        if (!wqueue)
 318                return -ENODEV;
 319
 320        if (!_filter) {
 321                /* Remove the old filter */
 322                wfilter = NULL;
 323                goto set;
 324        }
 325
 326        /* Grab the user's filter specification */
 327        if (copy_from_user(&filter, _filter, sizeof(filter)) != 0)
 328                return -EFAULT;
 329        if (filter.nr_filters == 0 ||
 330            filter.nr_filters > 16 ||
 331            filter.__reserved != 0)
 332                return -EINVAL;
 333
 334        tf = memdup_array_user(_filter->filters, filter.nr_filters, sizeof(*tf));
 335        if (IS_ERR(tf))
 336                return PTR_ERR(tf);
 337
 338        ret = -EINVAL;
 339        for (i = 0; i < filter.nr_filters; i++) {
 340                if ((tf[i].info_filter & ~tf[i].info_mask) ||
 341                    tf[i].info_mask & WATCH_INFO_LENGTH)
 342                        goto err_filter;
 343                /* Ignore any unknown types */
 344                if (tf[i].type >= WATCH_TYPE__NR)
 345                        continue;
 346                nr_filter++;
 347        }
 348
 349        /* Now we need to build the internal filter from only the relevant
 350         * user-specified filters.
 351         */
 352        ret = -ENOMEM;
 353        wfilter = kzalloc(struct_size(wfilter, filters, nr_filter), GFP_KERNEL);
 354        if (!wfilter)
 355                goto err_filter;
 356        wfilter->nr_filters = nr_filter;
 357
 358        q = wfilter->filters;
 359        for (i = 0; i < filter.nr_filters; i++) {
 360                if (tf[i].type >= WATCH_TYPE__NR)
 361                        continue;
 362
 363                q->type                 = tf[i].type;
 364                q->info_filter          = tf[i].info_filter;
 365                q->info_mask            = tf[i].info_mask;
 366                q->subtype_filter[0]    = tf[i].subtype_filter[0];
 367                __set_bit(q->type, wfilter->type_filter);
 368                q++;
 369        }
 370
 371        kfree(tf);
 372set:
 373        pipe_lock(pipe);
 374        wfilter = rcu_replace_pointer(wqueue->filter, wfilter,
 375                                      lockdep_is_held(&pipe->mutex));
 376        pipe_unlock(pipe);
 377        if (wfilter)
 378                kfree_rcu(wfilter, rcu);
 379        return 0;
 380
 381err_filter:
 382        kfree(tf);
 383        return ret;
 384}
 385
 386static void __put_watch_queue(struct kref *kref)
 387{
 388        struct watch_queue *wqueue =
 389                container_of(kref, struct watch_queue, usage);
 390        struct watch_filter *wfilter;
 391        int i;
 392
 393        for (i = 0; i < wqueue->nr_pages; i++)
 394                __free_page(wqueue->notes[i]);
 395        kfree(wqueue->notes);
 396        bitmap_free(wqueue->notes_bitmap);
 397
 398        wfilter = rcu_access_pointer(wqueue->filter);
 399        if (wfilter)
 400                kfree_rcu(wfilter, rcu);
 401        kfree_rcu(wqueue, rcu);
 402}
 403
 404/**
 405 * put_watch_queue - Dispose of a ref on a watchqueue.
 406 * @wqueue: The watch queue to unref.
 407 */
 408void put_watch_queue(struct watch_queue *wqueue)
 409{
 410        kref_put(&wqueue->usage, __put_watch_queue);
 411}
 412EXPORT_SYMBOL(put_watch_queue);
 413
 414static void free_watch(struct rcu_head *rcu)
 415{
 416        struct watch *watch = container_of(rcu, struct watch, rcu);
 417
 418        put_watch_queue(rcu_access_pointer(watch->queue));
 419        atomic_dec(&watch->cred->user->nr_watches);
 420        put_cred(watch->cred);
 421        kfree(watch);
 422}
 423
 424static void __put_watch(struct kref *kref)
 425{
 426        struct watch *watch = container_of(kref, struct watch, usage);
 427
 428        call_rcu(&watch->rcu, free_watch);
 429}
 430
 431/*
 432 * Discard a watch.
 433 */
 434static void put_watch(struct watch *watch)
 435{
 436        kref_put(&watch->usage, __put_watch);
 437}
 438
 439/**
 440 * init_watch - Initialise a watch
 441 * @watch: The watch to initialise.
 442 * @wqueue: The queue to assign.
 443 *
 444 * Initialise a watch and set the watch queue.
 445 */
 446void init_watch(struct watch *watch, struct watch_queue *wqueue)
 447{
 448        kref_init(&watch->usage);
 449        INIT_HLIST_NODE(&watch->list_node);
 450        INIT_HLIST_NODE(&watch->queue_node);
 451        rcu_assign_pointer(watch->queue, wqueue);
 452}
 453
 454static int add_one_watch(struct watch *watch, struct watch_list *wlist, struct watch_queue *wqueue)
 455{
 456        const struct cred *cred;
 457        struct watch *w;
 458
 459        hlist_for_each_entry(w, &wlist->watchers, list_node) {
 460                struct watch_queue *wq = rcu_access_pointer(w->queue);
 461                if (wqueue == wq && watch->id == w->id)
 462                        return -EBUSY;
 463        }
 464
 465        cred = current_cred();
 466        if (atomic_inc_return(&cred->user->nr_watches) > task_rlimit(current, RLIMIT_NOFILE)) {
 467                atomic_dec(&cred->user->nr_watches);
 468                return -EAGAIN;
 469        }
 470
 471        watch->cred = get_cred(cred);
 472        rcu_assign_pointer(watch->watch_list, wlist);
 473
 474        kref_get(&wqueue->usage);
 475        kref_get(&watch->usage);
 476        hlist_add_head(&watch->queue_node, &wqueue->watches);
 477        hlist_add_head_rcu(&watch->list_node, &wlist->watchers);
 478        return 0;
 479}
 480
 481/**
 482 * add_watch_to_object - Add a watch on an object to a watch list
 483 * @watch: The watch to add
 484 * @wlist: The watch list to add to
 485 *
 486 * @watch->queue must have been set to point to the queue to post notifications
 487 * to and the watch list of the object to be watched.  @watch->cred must also
 488 * have been set to the appropriate credentials and a ref taken on them.
 489 *
 490 * The caller must pin the queue and the list both and must hold the list
 491 * locked against racing watch additions/removals.
 492 */
 493int add_watch_to_object(struct watch *watch, struct watch_list *wlist)
 494{
 495        struct watch_queue *wqueue;
 496        int ret = -ENOENT;
 497
 498        rcu_read_lock();
 499
 500        wqueue = rcu_access_pointer(watch->queue);
 501        if (lock_wqueue(wqueue)) {
 502                spin_lock(&wlist->lock);
 503                ret = add_one_watch(watch, wlist, wqueue);
 504                spin_unlock(&wlist->lock);
 505                unlock_wqueue(wqueue);
 506        }
 507
 508        rcu_read_unlock();
 509        return ret;
 510}
 511EXPORT_SYMBOL(add_watch_to_object);
 512
 513/**
 514 * remove_watch_from_object - Remove a watch or all watches from an object.
 515 * @wlist: The watch list to remove from
 516 * @wq: The watch queue of interest (ignored if @all is true)
 517 * @id: The ID of the watch to remove (ignored if @all is true)
 518 * @all: True to remove all objects
 519 *
 520 * Remove a specific watch or all watches from an object.  A notification is
 521 * sent to the watcher to tell them that this happened.
 522 */
 523int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,
 524                             u64 id, bool all)
 525{
 526        struct watch_notification_removal n;
 527        struct watch_queue *wqueue;
 528        struct watch *watch;
 529        int ret = -EBADSLT;
 530
 531        rcu_read_lock();
 532
 533again:
 534        spin_lock(&wlist->lock);
 535        hlist_for_each_entry(watch, &wlist->watchers, list_node) {
 536                if (all ||
 537                    (watch->id == id && rcu_access_pointer(watch->queue) == wq))
 538                        goto found;
 539        }
 540        spin_unlock(&wlist->lock);
 541        goto out;
 542
 543found:
 544        ret = 0;
 545        hlist_del_init_rcu(&watch->list_node);
 546        rcu_assign_pointer(watch->watch_list, NULL);
 547        spin_unlock(&wlist->lock);
 548
 549        /* We now own the reference on watch that used to belong to wlist. */
 550
 551        n.watch.type = WATCH_TYPE_META;
 552        n.watch.subtype = WATCH_META_REMOVAL_NOTIFICATION;
 553        n.watch.info = watch->info_id | watch_sizeof(n.watch);
 554        n.id = id;
 555        if (id != 0)
 556                n.watch.info = watch->info_id | watch_sizeof(n);
 557
 558        wqueue = rcu_dereference(watch->queue);
 559
 560        if (lock_wqueue(wqueue)) {
 561                post_one_notification(wqueue, &n.watch);
 562
 563                if (!hlist_unhashed(&watch->queue_node)) {
 564                        hlist_del_init_rcu(&watch->queue_node);
 565                        put_watch(watch);
 566                }
 567
 568                unlock_wqueue(wqueue);
 569        }
 570
 571        if (wlist->release_watch) {
 572                void (*release_watch)(struct watch *);
 573
 574                release_watch = wlist->release_watch;
 575                rcu_read_unlock();
 576                (*release_watch)(watch);
 577                rcu_read_lock();
 578        }
 579        put_watch(watch);
 580
 581        if (all && !hlist_empty(&wlist->watchers))
 582                goto again;
 583out:
 584        rcu_read_unlock();
 585        return ret;
 586}
 587EXPORT_SYMBOL(remove_watch_from_object);
 588
 589/*
 590 * Remove all the watches that are contributory to a queue.  This has the
 591 * potential to race with removal of the watches by the destruction of the
 592 * objects being watched or with the distribution of notifications.
 593 */
 594void watch_queue_clear(struct watch_queue *wqueue)
 595{
 596        struct watch_list *wlist;
 597        struct watch *watch;
 598        bool release;
 599
 600        rcu_read_lock();
 601        spin_lock_bh(&wqueue->lock);
 602
 603        /*
 604         * This pipe can be freed by callers like free_pipe_info().
 605         * Removing this reference also prevents new notifications.
 606         */
 607        wqueue->pipe = NULL;
 608
 609        while (!hlist_empty(&wqueue->watches)) {
 610                watch = hlist_entry(wqueue->watches.first, struct watch, queue_node);
 611                hlist_del_init_rcu(&watch->queue_node);
 612                /* We now own a ref on the watch. */
 613                spin_unlock_bh(&wqueue->lock);
 614
 615                /* We can't do the next bit under the queue lock as we need to
 616                 * get the list lock - which would cause a deadlock if someone
 617                 * was removing from the opposite direction at the same time or
 618                 * posting a notification.
 619                 */
 620                wlist = rcu_dereference(watch->watch_list);
 621                if (wlist) {
 622                        void (*release_watch)(struct watch *);
 623
 624                        spin_lock(&wlist->lock);
 625
 626                        release = !hlist_unhashed(&watch->list_node);
 627                        if (release) {
 628                                hlist_del_init_rcu(&watch->list_node);
 629                                rcu_assign_pointer(watch->watch_list, NULL);
 630
 631                                /* We now own a second ref on the watch. */
 632                        }
 633
 634                        release_watch = wlist->release_watch;
 635                        spin_unlock(&wlist->lock);
 636
 637                        if (release) {
 638                                if (release_watch) {
 639                                        rcu_read_unlock();
 640                                        /* This might need to call dput(), so
 641                                         * we have to drop all the locks.
 642                                         */
 643                                        (*release_watch)(watch);
 644                                        rcu_read_lock();
 645                                }
 646                                put_watch(watch);
 647                        }
 648                }
 649
 650                put_watch(watch);
 651                spin_lock_bh(&wqueue->lock);
 652        }
 653
 654        spin_unlock_bh(&wqueue->lock);
 655        rcu_read_unlock();
 656}
 657
 658/**
 659 * get_watch_queue - Get a watch queue from its file descriptor.
 660 * @fd: The fd to query.
 661 */
 662struct watch_queue *get_watch_queue(int fd)
 663{
 664        struct pipe_inode_info *pipe;
 665        struct watch_queue *wqueue = ERR_PTR(-EINVAL);
 666        struct fd f;
 667
 668        f = fdget(fd);
 669        if (f.file) {
 670                pipe = get_pipe_info(f.file, false);
 671                if (pipe && pipe->watch_queue) {
 672                        wqueue = pipe->watch_queue;
 673                        kref_get(&wqueue->usage);
 674                }
 675                fdput(f);
 676        }
 677
 678        return wqueue;
 679}
 680EXPORT_SYMBOL(get_watch_queue);
 681
 682/*
 683 * Initialise a watch queue
 684 */
 685int watch_queue_init(struct pipe_inode_info *pipe)
 686{
 687        struct watch_queue *wqueue;
 688
 689        wqueue = kzalloc(sizeof(*wqueue), GFP_KERNEL);
 690        if (!wqueue)
 691                return -ENOMEM;
 692
 693        wqueue->pipe = pipe;
 694        kref_init(&wqueue->usage);
 695        spin_lock_init(&wqueue->lock);
 696        INIT_HLIST_HEAD(&wqueue->watches);
 697
 698        pipe->watch_queue = wqueue;
 699        return 0;
 700}
 701