00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include <linux/module.h>
00020 #include <linux/kernel.h>
00021 #include <linux/sched.h>
00022 #include <linux/init.h>
00023 #include <linux/signal.h>
00024 #include <linux/completion.h>
00025 #include <linux/workqueue.h>
00026 #include <linux/slab.h>
00027 #include <linux/cpu.h>
00028 #include <linux/notifier.h>
00029 #include <linux/kthread.h>
00030 #include <linux/hardirq.h>
00031 #include <linux/mempolicy.h>
00032 #include <linux/freezer.h>
00033 #include <linux/kallsyms.h>
00034 #include <linux/debug_locks.h>
00035 #include <linux/lockdep.h>
00036
00037 #ifdef DDE_LINUX
00038 #include "local.h"
00039 #endif
00040
00041
00042
00043
00044
00045 struct cpu_workqueue_struct {
00046
00047 spinlock_t lock;
00048
00049 struct list_head worklist;
00050 wait_queue_head_t more_work;
00051 struct work_struct *current_work;
00052
00053 struct workqueue_struct *wq;
00054 struct task_struct *thread;
00055
00056 int run_depth;
00057 } ____cacheline_aligned;
00058
00059
00060
00061
00062
00063 struct workqueue_struct {
00064 struct cpu_workqueue_struct *cpu_wq;
00065 struct list_head list;
00066 const char *name;
00067 int singlethread;
00068 int freezeable;
00069 int rt;
00070 #ifdef CONFIG_LOCKDEP
00071 struct lockdep_map lockdep_map;
00072 #endif
00073 };
00074
00075
00076 static DEFINE_SPINLOCK(workqueue_lock);
00077 static LIST_HEAD(workqueues);
00078
00079 static int singlethread_cpu __read_mostly;
00080 static const struct cpumask *cpu_singlethread_map __read_mostly;
00081
00082
00083
00084
00085
00086
00087
00088 static cpumask_var_t cpu_populated_map __read_mostly;
00089
00090
00091 static inline int is_wq_single_threaded(struct workqueue_struct *wq)
00092 {
00093 return wq->singlethread;
00094 }
00095
00096 static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
00097 {
00098 return is_wq_single_threaded(wq)
00099 ? cpu_singlethread_map : cpu_populated_map;
00100 }
00101
00102 static
00103 struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
00104 {
00105 if (unlikely(is_wq_single_threaded(wq)))
00106 cpu = singlethread_cpu;
00107 return per_cpu_ptr(wq->cpu_wq, cpu);
00108 }
00109
00110
00111
00112
00113
00114 static inline void set_wq_data(struct work_struct *work,
00115 struct cpu_workqueue_struct *cwq)
00116 {
00117 unsigned long new;
00118
00119 BUG_ON(!work_pending(work));
00120
00121 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
00122 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
00123 atomic_long_set(&work->data, new);
00124 }
00125
00126 static inline
00127 struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
00128 {
00129 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
00130 }
00131
00132 static void insert_work(struct cpu_workqueue_struct *cwq,
00133 struct work_struct *work, struct list_head *head)
00134 {
00135 set_wq_data(work, cwq);
00136
00137
00138
00139
00140 smp_wmb();
00141 list_add_tail(&work->entry, head);
00142 wake_up(&cwq->more_work);
00143 }
00144
00145 static void __queue_work(struct cpu_workqueue_struct *cwq,
00146 struct work_struct *work)
00147 {
00148 unsigned long flags;
00149
00150 spin_lock_irqsave(&cwq->lock, flags);
00151 insert_work(cwq, work, &cwq->worklist);
00152 spin_unlock_irqrestore(&cwq->lock, flags);
00153 }
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
00166 {
00167 int ret;
00168
00169 ret = queue_work_on(get_cpu(), wq, work);
00170 put_cpu();
00171
00172 return ret;
00173 }
00174 EXPORT_SYMBOL_GPL(queue_work);
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187 int
00188 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
00189 {
00190 int ret = 0;
00191
00192 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
00193 BUG_ON(!list_empty(&work->entry));
00194 __queue_work(wq_per_cpu(wq, cpu), work);
00195 ret = 1;
00196 }
00197 return ret;
00198 }
00199 EXPORT_SYMBOL_GPL(queue_work_on);
00200
00201 static void delayed_work_timer_fn(unsigned long __data)
00202 {
00203 struct delayed_work *dwork = (struct delayed_work *)__data;
00204 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
00205 struct workqueue_struct *wq = cwq->wq;
00206
00207 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
00208 }
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218 int queue_delayed_work(struct workqueue_struct *wq,
00219 struct delayed_work *dwork, unsigned long delay)
00220 {
00221 if (delay == 0)
00222 return queue_work(wq, &dwork->work);
00223
00224 return queue_delayed_work_on(-1, wq, dwork, delay);
00225 }
00226 EXPORT_SYMBOL_GPL(queue_delayed_work);
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
00238 struct delayed_work *dwork, unsigned long delay)
00239 {
00240 int ret = 0;
00241 struct timer_list *timer = &dwork->timer;
00242 struct work_struct *work = &dwork->work;
00243
00244 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
00245 BUG_ON(timer_pending(timer));
00246 BUG_ON(!list_empty(&work->entry));
00247
00248 timer_stats_timer_set_start_info(&dwork->timer);
00249
00250
00251 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
00252 timer->expires = jiffies + delay;
00253 timer->data = (unsigned long)dwork;
00254 timer->function = delayed_work_timer_fn;
00255
00256 if (unlikely(cpu >= 0))
00257 add_timer_on(timer, cpu);
00258 else
00259 add_timer(timer);
00260 ret = 1;
00261 }
00262 return ret;
00263 }
00264 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
00265
00266 static void run_workqueue(struct cpu_workqueue_struct *cwq)
00267 {
00268 spin_lock_irq(&cwq->lock);
00269 cwq->run_depth++;
00270 if (cwq->run_depth > 3) {
00271
00272 printk("%s: recursion depth exceeded: %d\n",
00273 __func__, cwq->run_depth);
00274 dump_stack();
00275 }
00276 while (!list_empty(&cwq->worklist)) {
00277 struct work_struct *work = list_entry(cwq->worklist.next,
00278 struct work_struct, entry);
00279 work_func_t f = work->func;
00280 #ifdef CONFIG_LOCKDEP
00281
00282
00283
00284
00285
00286
00287
00288
00289 struct lockdep_map lockdep_map = work->lockdep_map;
00290 #endif
00291
00292 cwq->current_work = work;
00293 list_del_init(cwq->worklist.next);
00294 spin_unlock_irq(&cwq->lock);
00295
00296 BUG_ON(get_wq_data(work) != cwq);
00297 work_clear_pending(work);
00298 lock_map_acquire(&cwq->wq->lockdep_map);
00299 lock_map_acquire(&lockdep_map);
00300 f(work);
00301 lock_map_release(&lockdep_map);
00302 lock_map_release(&cwq->wq->lockdep_map);
00303
00304 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
00305 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
00306 "%s/0x%08x/%d\n",
00307 current->comm, preempt_count(),
00308 task_pid_nr(current));
00309 #ifndef DDE_LINUX
00310 printk(KERN_ERR " last function: ");
00311 print_symbol("%s\n", (unsigned long)f);
00312 debug_show_held_locks(current);
00313 dump_stack();
00314 #endif
00315 }
00316
00317 spin_lock_irq(&cwq->lock);
00318 cwq->current_work = NULL;
00319 }
00320 cwq->run_depth--;
00321 spin_unlock_irq(&cwq->lock);
00322 }
00323
00324 static int worker_thread(void *__cwq)
00325 {
00326 struct cpu_workqueue_struct *cwq = __cwq;
00327 DEFINE_WAIT(wait);
00328
00329 if (cwq->wq->freezeable)
00330 set_freezable();
00331
00332 set_user_nice(current, -5);
00333
00334 for (;;) {
00335 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
00336 if (!freezing(current) &&
00337 !kthread_should_stop() &&
00338 list_empty(&cwq->worklist))
00339 schedule();
00340 finish_wait(&cwq->more_work, &wait);
00341
00342 try_to_freeze();
00343
00344 if (kthread_should_stop())
00345 break;
00346
00347 run_workqueue(cwq);
00348 }
00349
00350 return 0;
00351 }
00352
00353 struct wq_barrier {
00354 struct work_struct work;
00355 struct completion done;
00356 };
00357
00358 static void wq_barrier_func(struct work_struct *work)
00359 {
00360 struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
00361 complete(&barr->done);
00362 }
00363
00364 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
00365 struct wq_barrier *barr, struct list_head *head)
00366 {
00367 INIT_WORK(&barr->work, wq_barrier_func);
00368 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
00369
00370 init_completion(&barr->done);
00371
00372 insert_work(cwq, &barr->work, head);
00373 }
00374
00375 static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
00376 {
00377 int active;
00378
00379 if (cwq->thread == current) {
00380
00381
00382
00383
00384 run_workqueue(cwq);
00385 active = 1;
00386 } else {
00387 struct wq_barrier barr;
00388
00389 active = 0;
00390 spin_lock_irq(&cwq->lock);
00391 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
00392 insert_wq_barrier(cwq, &barr, &cwq->worklist);
00393 active = 1;
00394 }
00395 spin_unlock_irq(&cwq->lock);
00396
00397 if (active)
00398 wait_for_completion(&barr.done);
00399 }
00400
00401 return active;
00402 }
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417 void flush_workqueue(struct workqueue_struct *wq)
00418 {
00419 const struct cpumask *cpu_map = wq_cpu_map(wq);
00420 int cpu;
00421
00422 might_sleep();
00423 lock_map_acquire(&wq->lockdep_map);
00424 lock_map_release(&wq->lockdep_map);
00425 for_each_cpu_mask_nr(cpu, *cpu_map)
00426 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
00427 }
00428 EXPORT_SYMBOL_GPL(flush_workqueue);
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440 int flush_work(struct work_struct *work)
00441 {
00442 struct cpu_workqueue_struct *cwq;
00443 struct list_head *prev;
00444 struct wq_barrier barr;
00445
00446 might_sleep();
00447 cwq = get_wq_data(work);
00448 if (!cwq)
00449 return 0;
00450
00451 lock_map_acquire(&cwq->wq->lockdep_map);
00452 lock_map_release(&cwq->wq->lockdep_map);
00453
00454 prev = NULL;
00455 spin_lock_irq(&cwq->lock);
00456 if (!list_empty(&work->entry)) {
00457
00458
00459
00460
00461 smp_rmb();
00462 if (unlikely(cwq != get_wq_data(work)))
00463 goto out;
00464 prev = &work->entry;
00465 } else {
00466 if (cwq->current_work != work)
00467 goto out;
00468 prev = &cwq->worklist;
00469 }
00470 insert_wq_barrier(cwq, &barr, prev->next);
00471 out:
00472 spin_unlock_irq(&cwq->lock);
00473 if (!prev)
00474 return 0;
00475
00476 wait_for_completion(&barr.done);
00477 return 1;
00478 }
00479 EXPORT_SYMBOL_GPL(flush_work);
00480
00481
00482
00483
00484
00485 static int try_to_grab_pending(struct work_struct *work)
00486 {
00487 struct cpu_workqueue_struct *cwq;
00488 int ret = -1;
00489
00490 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
00491 return 0;
00492
00493
00494
00495
00496
00497
00498 cwq = get_wq_data(work);
00499 if (!cwq)
00500 return ret;
00501
00502 spin_lock_irq(&cwq->lock);
00503 if (!list_empty(&work->entry)) {
00504
00505
00506
00507
00508
00509 smp_rmb();
00510 if (cwq == get_wq_data(work)) {
00511 list_del_init(&work->entry);
00512 ret = 1;
00513 }
00514 }
00515 spin_unlock_irq(&cwq->lock);
00516
00517 return ret;
00518 }
00519
00520 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
00521 struct work_struct *work)
00522 {
00523 struct wq_barrier barr;
00524 int running = 0;
00525
00526 spin_lock_irq(&cwq->lock);
00527 if (unlikely(cwq->current_work == work)) {
00528 insert_wq_barrier(cwq, &barr, cwq->worklist.next);
00529 running = 1;
00530 }
00531 spin_unlock_irq(&cwq->lock);
00532
00533 if (unlikely(running))
00534 wait_for_completion(&barr.done);
00535 }
00536
00537 static void wait_on_work(struct work_struct *work)
00538 {
00539 struct cpu_workqueue_struct *cwq;
00540 struct workqueue_struct *wq;
00541 const struct cpumask *cpu_map;
00542 int cpu;
00543
00544 might_sleep();
00545
00546 lock_map_acquire(&work->lockdep_map);
00547 lock_map_release(&work->lockdep_map);
00548
00549 cwq = get_wq_data(work);
00550 if (!cwq)
00551 return;
00552
00553 wq = cwq->wq;
00554 cpu_map = wq_cpu_map(wq);
00555
00556 for_each_cpu_mask_nr(cpu, *cpu_map)
00557 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
00558 }
00559
00560 static int __cancel_work_timer(struct work_struct *work,
00561 struct timer_list* timer)
00562 {
00563 int ret;
00564
00565 do {
00566 ret = (timer && likely(del_timer(timer)));
00567 if (!ret)
00568 ret = try_to_grab_pending(work);
00569 wait_on_work(work);
00570 } while (unlikely(ret < 0));
00571
00572 work_clear_pending(work);
00573 return ret;
00574 }
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597 int cancel_work_sync(struct work_struct *work)
00598 {
00599 return __cancel_work_timer(work, NULL);
00600 }
00601 EXPORT_SYMBOL_GPL(cancel_work_sync);
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612 int cancel_delayed_work_sync(struct delayed_work *dwork)
00613 {
00614 return __cancel_work_timer(&dwork->work, &dwork->timer);
00615 }
00616 EXPORT_SYMBOL(cancel_delayed_work_sync);
00617
00618 static struct workqueue_struct *keventd_wq __read_mostly;
00619
00620
00621
00622
00623
00624
00625
00626 int schedule_work(struct work_struct *work)
00627 {
00628 return queue_work(keventd_wq, work);
00629 }
00630 EXPORT_SYMBOL(schedule_work);
00631
00632
00633
00634
00635
00636
00637
00638
00639 int schedule_work_on(int cpu, struct work_struct *work)
00640 {
00641 return queue_work_on(cpu, keventd_wq, work);
00642 }
00643 EXPORT_SYMBOL(schedule_work_on);
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653 int schedule_delayed_work(struct delayed_work *dwork,
00654 unsigned long delay)
00655 {
00656 return queue_delayed_work(keventd_wq, dwork, delay);
00657 }
00658 EXPORT_SYMBOL(schedule_delayed_work);
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669 int schedule_delayed_work_on(int cpu,
00670 struct delayed_work *dwork, unsigned long delay)
00671 {
00672 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
00673 }
00674 EXPORT_SYMBOL(schedule_delayed_work_on);
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684
00685 int schedule_on_each_cpu(work_func_t func)
00686 {
00687 int cpu;
00688 struct work_struct *works;
00689
00690 works = alloc_percpu(struct work_struct);
00691 if (!works)
00692 return -ENOMEM;
00693
00694 get_online_cpus();
00695 for_each_online_cpu(cpu) {
00696 struct work_struct *work = per_cpu_ptr(works, cpu);
00697
00698 INIT_WORK(work, func);
00699 schedule_work_on(cpu, work);
00700 }
00701 for_each_online_cpu(cpu)
00702 flush_work(per_cpu_ptr(works, cpu));
00703 put_online_cpus();
00704 free_percpu(works);
00705 return 0;
00706 }
00707
00708 void flush_scheduled_work(void)
00709 {
00710 flush_workqueue(keventd_wq);
00711 }
00712 EXPORT_SYMBOL(flush_scheduled_work);
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
00727 {
00728 if (!in_interrupt()) {
00729 fn(&ew->work);
00730 return 0;
00731 }
00732
00733 INIT_WORK(&ew->work, fn);
00734 schedule_work(&ew->work);
00735
00736 return 1;
00737 }
00738 EXPORT_SYMBOL_GPL(execute_in_process_context);
00739
00740 int keventd_up(void)
00741 {
00742 return keventd_wq != NULL;
00743 }
00744
00745 int current_is_keventd(void)
00746 {
00747 struct cpu_workqueue_struct *cwq;
00748 int cpu = raw_smp_processor_id();
00749 int ret = 0;
00750
00751 BUG_ON(!keventd_wq);
00752
00753 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
00754 if (current == cwq->thread)
00755 ret = 1;
00756
00757 return ret;
00758
00759 }
00760
00761 static struct cpu_workqueue_struct *
00762 init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
00763 {
00764 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
00765
00766 cwq->wq = wq;
00767 spin_lock_init(&cwq->lock);
00768 INIT_LIST_HEAD(&cwq->worklist);
00769 init_waitqueue_head(&cwq->more_work);
00770
00771 return cwq;
00772 }
00773
00774 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
00775 {
00776 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
00777 struct workqueue_struct *wq = cwq->wq;
00778 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
00779 struct task_struct *p;
00780
00781 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
00782
00783
00784
00785
00786
00787
00788
00789
00790 if (IS_ERR(p))
00791 return PTR_ERR(p);
00792 if (cwq->wq->rt)
00793 sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m);
00794 cwq->thread = p;
00795
00796 return 0;
00797 }
00798
00799 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
00800 {
00801 struct task_struct *p = cwq->thread;
00802
00803 if (p != NULL) {
00804 if (cpu >= 0)
00805 kthread_bind(p, cpu);
00806 wake_up_process(p);
00807 }
00808 }
00809
00810 struct workqueue_struct *__create_workqueue_key(const char *name,
00811 int singlethread,
00812 int freezeable,
00813 int rt,
00814 struct lock_class_key *key,
00815 const char *lock_name)
00816 {
00817 struct workqueue_struct *wq;
00818 struct cpu_workqueue_struct *cwq;
00819 int err = 0, cpu;
00820
00821 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
00822 if (!wq)
00823 return NULL;
00824
00825 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
00826 if (!wq->cpu_wq) {
00827 kfree(wq);
00828 return NULL;
00829 }
00830
00831 wq->name = name;
00832 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
00833 wq->singlethread = singlethread;
00834 wq->freezeable = freezeable;
00835 wq->rt = rt;
00836 INIT_LIST_HEAD(&wq->list);
00837
00838 if (singlethread) {
00839 cwq = init_cpu_workqueue(wq, singlethread_cpu);
00840 err = create_workqueue_thread(cwq, singlethread_cpu);
00841 start_workqueue_thread(cwq, -1);
00842 } else {
00843 cpu_maps_update_begin();
00844
00845
00846
00847
00848
00849
00850 spin_lock(&workqueue_lock);
00851 list_add(&wq->list, &workqueues);
00852 spin_unlock(&workqueue_lock);
00853
00854
00855
00856
00857
00858
00859 for_each_possible_cpu(cpu) {
00860 cwq = init_cpu_workqueue(wq, cpu);
00861 if (err || !cpu_online(cpu))
00862 continue;
00863 err = create_workqueue_thread(cwq, cpu);
00864 start_workqueue_thread(cwq, cpu);
00865 }
00866 cpu_maps_update_done();
00867 }
00868
00869 if (err) {
00870 destroy_workqueue(wq);
00871 wq = NULL;
00872 }
00873 return wq;
00874 }
00875 EXPORT_SYMBOL_GPL(__create_workqueue_key);
00876
00877 static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
00878 {
00879
00880
00881
00882
00883 if (cwq->thread == NULL)
00884 return;
00885
00886 lock_map_acquire(&cwq->wq->lockdep_map);
00887 lock_map_release(&cwq->wq->lockdep_map);
00888
00889 flush_cpu_workqueue(cwq);
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899
00900 kthread_stop(cwq->thread);
00901 cwq->thread = NULL;
00902 }
00903
00904
00905
00906
00907
00908
00909
00910 void destroy_workqueue(struct workqueue_struct *wq)
00911 {
00912 const struct cpumask *cpu_map = wq_cpu_map(wq);
00913 int cpu;
00914
00915 cpu_maps_update_begin();
00916 spin_lock(&workqueue_lock);
00917 list_del(&wq->list);
00918 spin_unlock(&workqueue_lock);
00919
00920 for_each_cpu_mask_nr(cpu, *cpu_map)
00921 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
00922 cpu_maps_update_done();
00923
00924 free_percpu(wq->cpu_wq);
00925 kfree(wq);
00926 }
00927 EXPORT_SYMBOL_GPL(destroy_workqueue);
00928
00929 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
00930 unsigned long action,
00931 void *hcpu)
00932 {
00933 unsigned int cpu = (unsigned long)hcpu;
00934 struct cpu_workqueue_struct *cwq;
00935 struct workqueue_struct *wq;
00936 int ret = NOTIFY_OK;
00937
00938 action &= ~CPU_TASKS_FROZEN;
00939
00940 switch (action) {
00941 case CPU_UP_PREPARE:
00942 cpumask_set_cpu(cpu, cpu_populated_map);
00943 }
00944 undo:
00945 list_for_each_entry(wq, &workqueues, list) {
00946 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
00947
00948 switch (action) {
00949 case CPU_UP_PREPARE:
00950 if (!create_workqueue_thread(cwq, cpu))
00951 break;
00952 printk(KERN_ERR "workqueue [%s] for %i failed\n",
00953 wq->name, cpu);
00954 action = CPU_UP_CANCELED;
00955 ret = NOTIFY_BAD;
00956 goto undo;
00957
00958 case CPU_ONLINE:
00959 start_workqueue_thread(cwq, cpu);
00960 break;
00961
00962 case CPU_UP_CANCELED:
00963 start_workqueue_thread(cwq, -1);
00964 case CPU_POST_DEAD:
00965 cleanup_workqueue_thread(cwq);
00966 break;
00967 }
00968 }
00969
00970 switch (action) {
00971 case CPU_UP_CANCELED:
00972 case CPU_POST_DEAD:
00973 cpumask_clear_cpu(cpu, cpu_populated_map);
00974 }
00975
00976 return ret;
00977 }
00978
00979 #ifdef CONFIG_SMP
00980 static struct workqueue_struct *work_on_cpu_wq __read_mostly;
00981
00982 struct work_for_cpu {
00983 struct work_struct work;
00984 long (*fn)(void *);
00985 void *arg;
00986 long ret;
00987 };
00988
00989 static void do_work_for_cpu(struct work_struct *w)
00990 {
00991 struct work_for_cpu *wfc = container_of(w, struct work_for_cpu, work);
00992
00993 wfc->ret = wfc->fn(wfc->arg);
00994 }
00995
00996
00997
00998
00999
01000
01001
01002
01003
01004
01005 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
01006 {
01007 struct work_for_cpu wfc;
01008
01009 INIT_WORK(&wfc.work, do_work_for_cpu);
01010 wfc.fn = fn;
01011 wfc.arg = arg;
01012 queue_work_on(cpu, work_on_cpu_wq, &wfc.work);
01013 flush_work(&wfc.work);
01014
01015 return wfc.ret;
01016 }
01017 EXPORT_SYMBOL_GPL(work_on_cpu);
01018 #endif
01019
01020 void __init init_workqueues(void)
01021 {
01022 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
01023
01024 cpumask_copy(cpu_populated_map, cpu_online_mask);
01025 singlethread_cpu = cpumask_first(cpu_possible_mask);
01026 cpu_singlethread_map = cpumask_of(singlethread_cpu);
01027 hotcpu_notifier(workqueue_cpu_callback, 0);
01028 keventd_wq = create_workqueue("events");
01029 BUG_ON(!keventd_wq);
01030 #ifdef CONFIG_SMP
01031 work_on_cpu_wq = create_workqueue("work_on_cpu");
01032 BUG_ON(!work_on_cpu_wq);
01033 #endif
01034 }
01035
01036 #ifdef DDE_LINUX
01037 core_initcall(init_workqueues);
01038 #endif