linux-bk/kernel/workqueue.c
<<
>>
Prefs
   1/*
   2 * linux/kernel/workqueue.c
   3 *
   4 * Generic mechanism for defining kernel helper threads for running
   5 * arbitrary tasks in process context.
   6 *
   7 * Started by Ingo Molnar, Copyright (C) 2002
   8 *
   9 * Derived from the taskqueue/keventd code by:
  10 *
  11 *   David Woodhouse <dwmw2@redhat.com>
  12 *   Andrew Morton <andrewm@uow.edu.au>
  13 *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
  14 *   Theodore Ts'o <tytso@mit.edu>
  15 */
  16
  17#include <linux/module.h>
  18#include <linux/kernel.h>
  19#include <linux/sched.h>
  20#include <linux/init.h>
  21#include <linux/signal.h>
  22#include <linux/completion.h>
  23#include <linux/workqueue.h>
  24#include <linux/slab.h>
  25#include <linux/cpu.h>
  26#include <linux/notifier.h>
  27#include <linux/kthread.h>
  28
  29/*
  30 * The per-CPU workqueue (if single thread, we always use cpu 0's).
  31 *
  32 * The sequence counters are for flush_scheduled_work().  It wants to wait
  33 * until until all currently-scheduled works are completed, but it doesn't
  34 * want to be livelocked by new, incoming ones.  So it waits until
  35 * remove_sequence is >= the insert_sequence which pertained when
  36 * flush_scheduled_work() was called.
  37 */
  38struct cpu_workqueue_struct {
  39
  40        spinlock_t lock;
  41
  42        long remove_sequence;   /* Least-recently added (next to run) */
  43        long insert_sequence;   /* Next to add */
  44
  45        struct list_head worklist;
  46        wait_queue_head_t more_work;
  47        wait_queue_head_t work_done;
  48
  49        struct workqueue_struct *wq;
  50        task_t *thread;
  51
  52        int run_depth;          /* Detect run_workqueue() recursion depth */
  53} ____cacheline_aligned;
  54
  55/*
  56 * The externally visible workqueue abstraction is an array of
  57 * per-CPU workqueues:
  58 */
  59struct workqueue_struct {
  60        struct cpu_workqueue_struct cpu_wq[NR_CPUS];
  61        const char *name;
  62        struct list_head list;  /* Empty if single thread */
  63};
  64
  65/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
  66   threads to each one as cpus come/go. */
  67static spinlock_t workqueue_lock = SPIN_LOCK_UNLOCKED;
  68static LIST_HEAD(workqueues);
  69
  70/* If it's single threaded, it isn't in the list of workqueues. */
  71static inline int is_single_threaded(struct workqueue_struct *wq)
  72{
  73        return list_empty(&wq->list);
  74}
  75
  76/* Preempt must be disabled. */
  77static void __queue_work(struct cpu_workqueue_struct *cwq,
  78                         struct work_struct *work)
  79{
  80        unsigned long flags;
  81
  82        spin_lock_irqsave(&cwq->lock, flags);
  83        work->wq_data = cwq;
  84        list_add_tail(&work->entry, &cwq->worklist);
  85        cwq->insert_sequence++;
  86        wake_up(&cwq->more_work);
  87        spin_unlock_irqrestore(&cwq->lock, flags);
  88}
  89
  90/*
  91 * Queue work on a workqueue. Return non-zero if it was successfully
  92 * added.
  93 *
  94 * We queue the work to the CPU it was submitted, but there is no
  95 * guarantee that it will be processed by that CPU.
  96 */
  97int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
  98{
  99        int ret = 0, cpu = get_cpu();
 100
 101        if (!test_and_set_bit(0, &work->pending)) {
 102                if (unlikely(is_single_threaded(wq)))
 103                        cpu = 0;
 104                BUG_ON(!list_empty(&work->entry));
 105                __queue_work(wq->cpu_wq + cpu, work);
 106                ret = 1;
 107        }
 108        put_cpu();
 109        return ret;
 110}
 111
 112static void delayed_work_timer_fn(unsigned long __data)
 113{
 114        struct work_struct *work = (struct work_struct *)__data;
 115        struct workqueue_struct *wq = work->wq_data;
 116        int cpu = smp_processor_id();
 117
 118        if (unlikely(is_single_threaded(wq)))
 119                cpu = 0;
 120
 121        __queue_work(wq->cpu_wq + cpu, work);
 122}
 123
 124int fastcall queue_delayed_work(struct workqueue_struct *wq,
 125                        struct work_struct *work, unsigned long delay)
 126{
 127        int ret = 0;
 128        struct timer_list *timer = &work->timer;
 129
 130        if (!test_and_set_bit(0, &work->pending)) {
 131                BUG_ON(timer_pending(timer));
 132                BUG_ON(!list_empty(&work->entry));
 133
 134                /* This stores wq for the moment, for the timer_fn */
 135                work->wq_data = wq;
 136                timer->expires = jiffies + delay;
 137                timer->data = (unsigned long)work;
 138                timer->function = delayed_work_timer_fn;
 139                add_timer(timer);
 140                ret = 1;
 141        }
 142        return ret;
 143}
 144
 145static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
 146{
 147        unsigned long flags;
 148
 149        /*
 150         * Keep taking off work from the queue until
 151         * done.
 152         */
 153        spin_lock_irqsave(&cwq->lock, flags);
 154        cwq->run_depth++;
 155        if (cwq->run_depth > 3) {
 156                /* morton gets to eat his hat */
 157                printk("%s: recursion depth exceeded: %d\n",
 158                        __FUNCTION__, cwq->run_depth);
 159                dump_stack();
 160        }
 161        while (!list_empty(&cwq->worklist)) {
 162                struct work_struct *work = list_entry(cwq->worklist.next,
 163                                                struct work_struct, entry);
 164                void (*f) (void *) = work->func;
 165                void *data = work->data;
 166
 167                list_del_init(cwq->worklist.next);
 168                spin_unlock_irqrestore(&cwq->lock, flags);
 169
 170                BUG_ON(work->wq_data != cwq);
 171                clear_bit(0, &work->pending);
 172                f(data);
 173
 174                spin_lock_irqsave(&cwq->lock, flags);
 175                cwq->remove_sequence++;
 176                wake_up(&cwq->work_done);
 177        }
 178        cwq->run_depth--;
 179        spin_unlock_irqrestore(&cwq->lock, flags);
 180}
 181
 182static int worker_thread(void *__cwq)
 183{
 184        struct cpu_workqueue_struct *cwq = __cwq;
 185        DECLARE_WAITQUEUE(wait, current);
 186        struct k_sigaction sa;
 187        sigset_t blocked;
 188
 189        current->flags |= PF_NOFREEZE;
 190
 191        set_user_nice(current, -10);
 192
 193        /* Block and flush all signals */
 194        sigfillset(&blocked);
 195        sigprocmask(SIG_BLOCK, &blocked, NULL);
 196        flush_signals(current);
 197
 198        /* SIG_IGN makes children autoreap: see do_notify_parent(). */
 199        sa.sa.sa_handler = SIG_IGN;
 200        sa.sa.sa_flags = 0;
 201        siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
 202        do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
 203
 204        set_current_state(TASK_INTERRUPTIBLE);
 205        while (!kthread_should_stop()) {
 206                add_wait_queue(&cwq->more_work, &wait);
 207                if (list_empty(&cwq->worklist))
 208                        schedule();
 209                else
 210                        __set_current_state(TASK_RUNNING);
 211                remove_wait_queue(&cwq->more_work, &wait);
 212
 213                if (!list_empty(&cwq->worklist))
 214                        run_workqueue(cwq);
 215                set_current_state(TASK_INTERRUPTIBLE);
 216        }
 217        __set_current_state(TASK_RUNNING);
 218        return 0;
 219}
 220
 221static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
 222{
 223        if (cwq->thread == current) {
 224                /*
 225                 * Probably keventd trying to flush its own queue. So simply run
 226                 * it by hand rather than deadlocking.
 227                 */
 228                run_workqueue(cwq);
 229        } else {
 230                DEFINE_WAIT(wait);
 231                long sequence_needed;
 232
 233                spin_lock_irq(&cwq->lock);
 234                sequence_needed = cwq->insert_sequence;
 235
 236                while (sequence_needed - cwq->remove_sequence > 0) {
 237                        prepare_to_wait(&cwq->work_done, &wait,
 238                                        TASK_UNINTERRUPTIBLE);
 239                        spin_unlock_irq(&cwq->lock);
 240                        schedule();
 241                        spin_lock_irq(&cwq->lock);
 242                }
 243                finish_wait(&cwq->work_done, &wait);
 244                spin_unlock_irq(&cwq->lock);
 245        }
 246}
 247
 248/*
 249 * flush_workqueue - ensure that any scheduled work has run to completion.
 250 *
 251 * Forces execution of the workqueue and blocks until its completion.
 252 * This is typically used in driver shutdown handlers.
 253 *
 254 * This function will sample each workqueue's current insert_sequence number and
 255 * will sleep until the head sequence is greater than or equal to that.  This
 256 * means that we sleep until all works which were queued on entry have been
 257 * handled, but we are not livelocked by new incoming ones.
 258 *
 259 * This function used to run the workqueues itself.  Now we just wait for the
 260 * helper threads to do it.
 261 */
 262void fastcall flush_workqueue(struct workqueue_struct *wq)
 263{
 264        might_sleep();
 265
 266        if (is_single_threaded(wq)) {
 267                /* Always use cpu 0's area. */
 268                flush_cpu_workqueue(wq->cpu_wq + 0);
 269        } else {
 270                int cpu;
 271
 272                lock_cpu_hotplug();
 273                for_each_online_cpu(cpu)
 274                        flush_cpu_workqueue(wq->cpu_wq + cpu);
 275                unlock_cpu_hotplug();
 276        }
 277}
 278
 279static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
 280                                                   int cpu)
 281{
 282        struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
 283        struct task_struct *p;
 284
 285        spin_lock_init(&cwq->lock);
 286        cwq->wq = wq;
 287        cwq->thread = NULL;
 288        cwq->insert_sequence = 0;
 289        cwq->remove_sequence = 0;
 290        INIT_LIST_HEAD(&cwq->worklist);
 291        init_waitqueue_head(&cwq->more_work);
 292        init_waitqueue_head(&cwq->work_done);
 293
 294        if (is_single_threaded(wq))
 295                p = kthread_create(worker_thread, cwq, "%s", wq->name);
 296        else
 297                p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
 298        if (IS_ERR(p))
 299                return NULL;
 300        cwq->thread = p;
 301        return p;
 302}
 303
 304struct workqueue_struct *__create_workqueue(const char *name,
 305                                            int singlethread)
 306{
 307        int cpu, destroy = 0;
 308        struct workqueue_struct *wq;
 309        struct task_struct *p;
 310
 311        BUG_ON(strlen(name) > 10);
 312
 313        wq = kmalloc(sizeof(*wq), GFP_KERNEL);
 314        if (!wq)
 315                return NULL;
 316        memset(wq, 0, sizeof(*wq));
 317
 318        wq->name = name;
 319        /* We don't need the distraction of CPUs appearing and vanishing. */
 320        lock_cpu_hotplug();
 321        if (singlethread) {
 322                INIT_LIST_HEAD(&wq->list);
 323                p = create_workqueue_thread(wq, 0);
 324                if (!p)
 325                        destroy = 1;
 326                else
 327                        wake_up_process(p);
 328        } else {
 329                spin_lock(&workqueue_lock);
 330                list_add(&wq->list, &workqueues);
 331                spin_unlock(&workqueue_lock);
 332                for_each_online_cpu(cpu) {
 333                        p = create_workqueue_thread(wq, cpu);
 334                        if (p) {
 335                                kthread_bind(p, cpu);
 336                                wake_up_process(p);
 337                        } else
 338                                destroy = 1;
 339                }
 340        }
 341        unlock_cpu_hotplug();
 342
 343        /*
 344         * Was there any error during startup? If yes then clean up:
 345         */
 346        if (destroy) {
 347                destroy_workqueue(wq);
 348                wq = NULL;
 349        }
 350        return wq;
 351}
 352
 353static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
 354{
 355        struct cpu_workqueue_struct *cwq;
 356        unsigned long flags;
 357        struct task_struct *p;
 358
 359        cwq = wq->cpu_wq + cpu;
 360        spin_lock_irqsave(&cwq->lock, flags);
 361        p = cwq->thread;
 362        cwq->thread = NULL;
 363        spin_unlock_irqrestore(&cwq->lock, flags);
 364        if (p)
 365                kthread_stop(p);
 366}
 367
 368void destroy_workqueue(struct workqueue_struct *wq)
 369{
 370        int cpu;
 371
 372        flush_workqueue(wq);
 373
 374        /* We don't need the distraction of CPUs appearing and vanishing. */
 375        lock_cpu_hotplug();
 376        if (is_single_threaded(wq))
 377                cleanup_workqueue_thread(wq, 0);
 378        else {
 379                for_each_online_cpu(cpu)
 380                        cleanup_workqueue_thread(wq, cpu);
 381                spin_lock(&workqueue_lock);
 382                list_del(&wq->list);
 383                spin_unlock(&workqueue_lock);
 384        }
 385        unlock_cpu_hotplug();
 386        kfree(wq);
 387}
 388
 389static struct workqueue_struct *keventd_wq;
 390
 391int fastcall schedule_work(struct work_struct *work)
 392{
 393        return queue_work(keventd_wq, work);
 394}
 395
 396int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
 397{
 398        return queue_delayed_work(keventd_wq, work, delay);
 399}
 400
 401void flush_scheduled_work(void)
 402{
 403        flush_workqueue(keventd_wq);
 404}
 405
 406int keventd_up(void)
 407{
 408        return keventd_wq != NULL;
 409}
 410
 411int current_is_keventd(void)
 412{
 413        struct cpu_workqueue_struct *cwq;
 414        int cpu = smp_processor_id();   /* preempt-safe: keventd is per-cpu */
 415        int ret = 0;
 416
 417        BUG_ON(!keventd_wq);
 418
 419        cwq = keventd_wq->cpu_wq + cpu;
 420        if (current == cwq->thread)
 421                ret = 1;
 422
 423        return ret;
 424
 425}
 426
 427#ifdef CONFIG_HOTPLUG_CPU
 428/* Take the work from this (downed) CPU. */
 429static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
 430{
 431        struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
 432        LIST_HEAD(list);
 433        struct work_struct *work;
 434
 435        spin_lock_irq(&cwq->lock);
 436        list_splice_init(&cwq->worklist, &list);
 437
 438        while (!list_empty(&list)) {
 439                printk("Taking work for %s\n", wq->name);
 440                work = list_entry(list.next,struct work_struct,entry);
 441                list_del(&work->entry);
 442                __queue_work(wq->cpu_wq + smp_processor_id(), work);
 443        }
 444        spin_unlock_irq(&cwq->lock);
 445}
 446
 447/* We're holding the cpucontrol mutex here */
 448static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 449                                  unsigned long action,
 450                                  void *hcpu)
 451{
 452        unsigned int hotcpu = (unsigned long)hcpu;
 453        struct workqueue_struct *wq;
 454
 455        switch (action) {
 456        case CPU_UP_PREPARE:
 457                /* Create a new workqueue thread for it. */
 458                list_for_each_entry(wq, &workqueues, list) {
 459                        if (create_workqueue_thread(wq, hotcpu) < 0) {
 460                                printk("workqueue for %i failed\n", hotcpu);
 461                                return NOTIFY_BAD;
 462                        }
 463                }
 464                break;
 465
 466        case CPU_ONLINE:
 467                /* Kick off worker threads. */
 468                list_for_each_entry(wq, &workqueues, list)
 469                        wake_up_process(wq->cpu_wq[hotcpu].thread);
 470                break;
 471
 472        case CPU_UP_CANCELED:
 473                list_for_each_entry(wq, &workqueues, list) {
 474                        /* Unbind so it can run. */
 475                        kthread_bind(wq->cpu_wq[hotcpu].thread,
 476                                     smp_processor_id());
 477                        cleanup_workqueue_thread(wq, hotcpu);
 478                }
 479                break;
 480
 481        case CPU_DEAD:
 482                list_for_each_entry(wq, &workqueues, list)
 483                        cleanup_workqueue_thread(wq, hotcpu);
 484                list_for_each_entry(wq, &workqueues, list)
 485                        take_over_work(wq, hotcpu);
 486                break;
 487        }
 488
 489        return NOTIFY_OK;
 490}
 491#endif
 492
 493void init_workqueues(void)
 494{
 495        hotcpu_notifier(workqueue_cpu_callback, 0);
 496        keventd_wq = create_workqueue("events");
 497        BUG_ON(!keventd_wq);
 498}
 499
 500EXPORT_SYMBOL_GPL(__create_workqueue);
 501EXPORT_SYMBOL_GPL(queue_work);
 502EXPORT_SYMBOL_GPL(queue_delayed_work);
 503EXPORT_SYMBOL_GPL(flush_workqueue);
 504EXPORT_SYMBOL_GPL(destroy_workqueue);
 505
 506EXPORT_SYMBOL(schedule_work);
 507EXPORT_SYMBOL(schedule_delayed_work);
 508EXPORT_SYMBOL(flush_scheduled_work);
 509
 510
lxr.linux.no kindly hosted by Redpill Linpro AS, provider of Linux consulting and operations services since 1995.