Print this page
    
9017 Introduce taskq_empty()
Reviewed by: Bryan Cantrill <bryan@joyent.com>
Reviewed by: Dan McDonald <danmcd@joyent.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Yuri Pankov <yuripv@yuripv.net>
    
      
        | Split | 
	Close | 
      
      | Expand all | 
      | Collapse all | 
    
    
          --- old/usr/src/uts/common/os/taskq.c
          +++ new/usr/src/uts/common/os/taskq.c
   1    1  /*
   2    2   * CDDL HEADER START
   3    3   *
   4    4   * The contents of this file are subject to the terms of the
   5    5   * Common Development and Distribution License (the "License").
   6    6   * You may not use this file except in compliance with the License.
   7    7   *
   8    8   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9    9   * or http://www.opensolaris.org/os/licensing.
  10   10   * See the License for the specific language governing permissions
  11   11   * and limitations under the License.
  12   12   *
  13   13   * When distributing Covered Code, include this CDDL HEADER in each
  14   14   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15   15   * If applicable, add the following below this CDDL HEADER, with the
  16   16   * fields enclosed by brackets "[]" replaced with your own identifying
  17   17   * information: Portions Copyright [yyyy] [name of copyright owner]
  18   18   *
  
    | 
      ↓ open down ↓ | 
    18 lines elided | 
    
      ↑ open up ↑ | 
  
  19   19   * CDDL HEADER END
  20   20   */
  21   21  /*
  22   22   * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23   23   * Use is subject to license terms.
  24   24   */
  25   25  
  26   26  /*
  27   27   * Copyright 2015 Nexenta Systems, Inc.  All rights reserved.
  28   28   * Copyright (c) 2017 by Delphix. All rights reserved.
       29 + * Copyright 2018, Joyent, Inc.
  29   30   */
  30   31  
  31   32  /*
  32   33   * Kernel task queues: general-purpose asynchronous task scheduling.
  33   34   *
  34   35   * A common problem in kernel programming is the need to schedule tasks
  35   36   * to be performed later, by another thread. There are several reasons
  36   37   * you may want or need to do this:
  37   38   *
  38   39   * (1) The task isn't time-critical, but your current code path is.
  39   40   *
  40   41   * (2) The task may require grabbing locks that you already hold.
  41   42   *
  42   43   * (3) The task may need to block (e.g. to wait for memory), but you
  43   44   *     cannot block in your current context.
  44   45   *
  45   46   * (4) Your code path can't complete because of some condition, but you can't
  46   47   *     sleep or fail, so you queue the task for later execution when condition
  47   48   *     disappears.
  48   49   *
  49   50   * (5) You just want a simple way to launch multiple tasks in parallel.
  50   51   *
  51   52   * Task queues provide such a facility. In its simplest form (used when
  52   53   * performance is not a critical consideration) a task queue consists of a
  53   54   * single list of tasks, together with one or more threads to service the
  54   55   * list. There are some cases when this simple queue is not sufficient:
  55   56   *
  56   57   * (1) The task queues are very hot and there is a need to avoid data and lock
  57   58   *      contention over global resources.
  58   59   *
  59   60   * (2) Some tasks may depend on other tasks to complete, so they can't be put in
  60   61   *      the same list managed by the same thread.
  61   62   *
  62   63   * (3) Some tasks may block for a long time, and this should not block other
  63   64   *      tasks in the queue.
  64   65   *
  65   66   * To provide useful service in such cases we define a "dynamic task queue"
  66   67   * which has an individual thread for each of the tasks. These threads are
  67   68   * dynamically created as they are needed and destroyed when they are not in
  68   69   * use. The API for managing task pools is the same as for managing task queues
  69   70   * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that
  70   71   * dynamic task pool behavior is desired.
  71   72   *
  72   73   * Dynamic task queues may also place tasks in the normal queue (called "backing
  73   74   * queue") when task pool runs out of resources. Users of task queues may
  74   75   * disallow such queued scheduling by specifying TQ_NOQUEUE in the dispatch
  75   76   * flags.
  76   77   *
  77   78   * The backing task queue is also used for scheduling internal tasks needed for
  78   79   * dynamic task queue maintenance.
  79   80   *
  80   81   * INTERFACES ==================================================================
  81   82   *
  82   83   * taskq_t *taskq_create(name, nthreads, pri, minalloc, maxalloc, flags);
  83   84   *
  84   85   *      Create a taskq with specified properties.
  85   86   *      Possible 'flags':
  86   87   *
  87   88   *        TASKQ_DYNAMIC: Create task pool for task management. If this flag is
  88   89   *              specified, 'nthreads' specifies the maximum number of threads in
  89   90   *              the task queue. Task execution order for dynamic task queues is
  90   91   *              not predictable.
  91   92   *
  92   93   *              If this flag is not specified (default case) a
  93   94   *              single-list task queue is created with 'nthreads' threads
  94   95   *              servicing it. Entries in this queue are managed by
  95   96   *              taskq_ent_alloc() and taskq_ent_free() which try to keep the
  96   97   *              task population between 'minalloc' and 'maxalloc', but the
  97   98   *              latter limit is only advisory for TQ_SLEEP dispatches and the
  98   99   *              former limit is only advisory for TQ_NOALLOC dispatches. If
  99  100   *              TASKQ_PREPOPULATE is set in 'flags', the taskq will be
 100  101   *              prepopulated with 'minalloc' task structures.
 101  102   *
 102  103   *              Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be
 103  104   *              executed in the order they are scheduled if nthreads == 1.
 104  105   *              If nthreads > 1, task execution order is not predictable.
 105  106   *
 106  107   *        TASKQ_PREPOPULATE: Prepopulate task queue with threads.
 107  108   *              Also prepopulate the task queue with 'minalloc' task structures.
 108  109   *
 109  110   *        TASKQ_THREADS_CPU_PCT: This flag specifies that 'nthreads' should be
 110  111   *              interpreted as a percentage of the # of online CPUs on the
 111  112   *              system.  The taskq subsystem will automatically adjust the
 112  113   *              number of threads in the taskq in response to CPU online
 113  114   *              and offline events, to keep the ratio.  nthreads must be in
 114  115   *              the range [0,100].
 115  116   *
 116  117   *              The calculation used is:
 117  118   *
 118  119   *                      MAX((ncpus_online * percentage)/100, 1)
 119  120   *
 120  121   *              This flag is not supported for DYNAMIC task queues.
 121  122   *              This flag is not compatible with TASKQ_CPR_SAFE.
 122  123   *
 123  124   *        TASKQ_CPR_SAFE: This flag specifies that users of the task queue will
 124  125   *              use their own protocol for handling CPR issues. This flag is not
 125  126   *              supported for DYNAMIC task queues.  This flag is not compatible
 126  127   *              with TASKQ_THREADS_CPU_PCT.
 127  128   *
 128  129   *      The 'pri' field specifies the default priority for the threads that
 129  130   *      service all scheduled tasks.
 130  131   *
 131  132   * taskq_t *taskq_create_instance(name, instance, nthreads, pri, minalloc,
 132  133   *    maxalloc, flags);
 133  134   *
 134  135   *      Like taskq_create(), but takes an instance number (or -1 to indicate
 135  136   *      no instance).
 136  137   *
 137  138   * taskq_t *taskq_create_proc(name, nthreads, pri, minalloc, maxalloc, proc,
 138  139   *    flags);
 139  140   *
 140  141   *      Like taskq_create(), but creates the taskq threads in the specified
 141  142   *      system process.  If proc != &p0, this must be called from a thread
 142  143   *      in that process.
 143  144   *
 144  145   * taskq_t *taskq_create_sysdc(name, nthreads, minalloc, maxalloc, proc,
 145  146   *    dc, flags);
 146  147   *
 147  148   *      Like taskq_create_proc(), but the taskq threads will use the
 148  149   *      System Duty Cycle (SDC) scheduling class with a duty cycle of dc.
 149  150   *
 150  151   * void taskq_destroy(tap):
 151  152   *
 152  153   *      Waits for any scheduled tasks to complete, then destroys the taskq.
 153  154   *      Caller should guarantee that no new tasks are scheduled in the closing
 154  155   *      taskq.
 155  156   *
 156  157   * taskqid_t taskq_dispatch(tq, func, arg, flags):
 157  158   *
 158  159   *      Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether
 159  160   *      the caller is willing to block for memory.  The function returns an
 160  161   *      opaque value which is zero iff dispatch fails.  If flags is TQ_NOSLEEP
 161  162   *      or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails
 162  163   *      and returns (taskqid_t)0.
 163  164   *
 164  165   *      ASSUMES: func != NULL.
 165  166   *
 166  167   *      Possible flags:
 167  168   *        TQ_NOSLEEP: Do not wait for resources; may fail.
 168  169   *
 169  170   *        TQ_NOALLOC: Do not allocate memory; may fail.  May only be used with
 170  171   *              non-dynamic task queues.
 171  172   *
 172  173   *        TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to
 173  174   *              lack of available resources and fail. If this flag is not
 174  175   *              set, and the task pool is exhausted, the task may be scheduled
 175  176   *              in the backing queue. This flag may ONLY be used with dynamic
 176  177   *              task queues.
 177  178   *
 178  179   *              NOTE: This flag should always be used when a task queue is used
 179  180   *              for tasks that may depend on each other for completion.
 180  181   *              Enqueueing dependent tasks may create deadlocks.
 181  182   *
 182  183   *        TQ_SLEEP:   May block waiting for resources. May still fail for
 183  184   *              dynamic task queues if TQ_NOQUEUE is also specified, otherwise
 184  185   *              always succeed.
 185  186   *
 186  187   *        TQ_FRONT:   Puts the new task at the front of the queue.  Be careful.
 187  188   *
 188  189   *      NOTE: Dynamic task queues are much more likely to fail in
 189  190   *              taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it
 190  191   *              is important to have backup strategies handling such failures.
 191  192   *
 192  193   * void taskq_dispatch_ent(tq, func, arg, flags, tqent)
  
    | 
      ↓ open down ↓ | 
    154 lines elided | 
    
      ↑ open up ↑ | 
  
 193  194   *
 194  195   *      This is a light-weight form of taskq_dispatch(), that uses a
 195  196   *      preallocated taskq_ent_t structure for scheduling.  As a
 196  197   *      result, it does not perform allocations and cannot ever fail.
 197  198   *      Note especially that it cannot be used with TASKQ_DYNAMIC
 198  199   *      taskqs.  The memory for the tqent must not be modified or used
 199  200   *      until the function (func) is called.  (However, func itself
 200  201   *      may safely modify or free this memory, once it is called.)
 201  202   *      Note that the taskq framework will NOT free this memory.
 202  203   *
      204 + * boolean_t taskq_empty(tq)
      205 + *
      206 + *      Queries if there are tasks pending on the queue.
      207 + *
 203  208   * void taskq_wait(tq):
 204  209   *
 205  210   *      Waits for all previously scheduled tasks to complete.
 206  211   *
 207  212   *      NOTE: It does not stop any new task dispatches.
 208  213   *            Do NOT call taskq_wait() from a task: it will cause deadlock.
 209  214   *
 210  215   * void taskq_suspend(tq)
 211  216   *
 212  217   *      Suspend all task execution. Tasks already scheduled for a dynamic task
 213  218   *      queue will still be executed, but all new scheduled tasks will be
 214  219   *      suspended until taskq_resume() is called.
 215  220   *
 216  221   * int  taskq_suspended(tq)
 217  222   *
 218  223   *      Returns 1 if taskq is suspended and 0 otherwise. It is intended to
 219  224   *      ASSERT that the task queue is suspended.
 220  225   *
 221  226   * void taskq_resume(tq)
 222  227   *
 223  228   *      Resume task queue execution.
 224  229   *
 225  230   * int  taskq_member(tq, thread)
 226  231   *
 227  232   *      Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The
 228  233   *      intended use is to ASSERT that a given function is called in taskq
 229  234   *      context only.
 230  235   *
 231  236   * system_taskq
 232  237   *
 233  238   *      Global system-wide dynamic task queue for common uses. It may be used by
 234  239   *      any subsystem that needs to schedule tasks and does not need to manage
 235  240   *      its own task queues. It is initialized quite early during system boot.
 236  241   *
 237  242   * IMPLEMENTATION ==============================================================
 238  243   *
 239  244   * This is schematic representation of the task queue structures.
 240  245   *
 241  246   *   taskq:
 242  247   *   +-------------+
 243  248   *   | tq_lock     | +---< taskq_ent_free()
 244  249   *   +-------------+ |
 245  250   *   |...          | | tqent:                  tqent:
 246  251   *   +-------------+ | +------------+          +------------+
 247  252   *   | tq_freelist |-->| tqent_next |--> ... ->| tqent_next |
 248  253   *   +-------------+   +------------+          +------------+
 249  254   *   |...          |   | ...        |          | ...        |
 250  255   *   +-------------+   +------------+          +------------+
 251  256   *   | tq_task     |    |
 252  257   *   |             |    +-------------->taskq_ent_alloc()
 253  258   * +--------------------------------------------------------------------------+
 254  259   * | |                     |            tqent                   tqent         |
 255  260   * | +---------------------+     +--> +------------+     +--> +------------+  |
 256  261   * | | ...                 |     |    | func, arg  |     |    | func, arg  |  |
 257  262   * +>+---------------------+ <---|-+  +------------+ <---|-+  +------------+  |
 258  263   *   | tq_taskq.tqent_next | ----+ |  | tqent_next | --->+ |  | tqent_next |--+
 259  264   *   +---------------------+       |  +------------+     ^ |  +------------+
 260  265   * +-| tq_task.tqent_prev  |       +--| tqent_prev |     | +--| tqent_prev |  ^
 261  266   * | +---------------------+          +------------+     |    +------------+  |
 262  267   * | |...                  |          | ...        |     |    | ...        |  |
 263  268   * | +---------------------+          +------------+     |    +------------+  |
 264  269   * |                                      ^              |                    |
 265  270   * |                                      |              |                    |
 266  271   * +--------------------------------------+--------------+       TQ_APPEND() -+
 267  272   *   |             |                      |
 268  273   *   |...          |   taskq_thread()-----+
 269  274   *   +-------------+
 270  275   *   | tq_buckets  |--+-------> [ NULL ] (for regular task queues)
 271  276   *   +-------------+  |
 272  277   *                    |   DYNAMIC TASK QUEUES:
 273  278   *                    |
 274  279   *                    +-> taskq_bucket[nCPU]            taskq_bucket_dispatch()
 275  280   *                        +-------------------+                    ^
 276  281   *                   +--->| tqbucket_lock     |                    |
 277  282   *                   |    +-------------------+   +--------+      +--------+
 278  283   *                   |    | tqbucket_freelist |-->| tqent  |-->...| tqent  | ^
 279  284   *                   |    +-------------------+<--+--------+<--...+--------+ |
 280  285   *                   |    | ...               |   | thread |      | thread | |
 281  286   *                   |    +-------------------+   +--------+      +--------+ |
 282  287   *                   |    +-------------------+                              |
 283  288   * taskq_dispatch()--+--->| tqbucket_lock     |             TQ_APPEND()------+
 284  289   *      TQ_HASH()    |    +-------------------+   +--------+      +--------+
 285  290   *                   |    | tqbucket_freelist |-->| tqent  |-->...| tqent  |
 286  291   *                   |    +-------------------+<--+--------+<--...+--------+
 287  292   *                   |    | ...               |   | thread |      | thread |
 288  293   *                   |    +-------------------+   +--------+      +--------+
 289  294   *                   +--->      ...
 290  295   *
 291  296   *
 292  297   * Task queues use tq_task field to link new entry in the queue. The queue is a
 293  298   * circular doubly-linked list. Entries are put in the end of the list with
 294  299   * TQ_APPEND() and processed from the front of the list by taskq_thread() in
 295  300   * FIFO order. Task queue entries are cached in the free list managed by
 296  301   * taskq_ent_alloc() and taskq_ent_free() functions.
 297  302   *
 298  303   *      All threads used by task queues mark t_taskq field of the thread to
 299  304   *      point to the task queue.
 300  305   *
 301  306   * Taskq Thread Management -----------------------------------------------------
 302  307   *
 303  308   * Taskq's non-dynamic threads are managed with several variables and flags:
 304  309   *
 305  310   *      * tq_nthreads   - The number of threads in taskq_thread() for the
 306  311   *                        taskq.
 307  312   *
 308  313   *      * tq_active     - The number of threads not waiting on a CV in
 309  314   *                        taskq_thread(); includes newly created threads
 310  315   *                        not yet counted in tq_nthreads.
 311  316   *
 312  317   *      * tq_nthreads_target
 313  318   *                      - The number of threads desired for the taskq.
 314  319   *
 315  320   *      * tq_flags & TASKQ_CHANGING
 316  321   *                      - Indicates that tq_nthreads != tq_nthreads_target.
 317  322   *
 318  323   *      * tq_flags & TASKQ_THREAD_CREATED
 319  324   *                      - Indicates that a thread is being created in the taskq.
 320  325   *
 321  326   * During creation, tq_nthreads and tq_active are set to 0, and
 322  327   * tq_nthreads_target is set to the number of threads desired.  The
 323  328   * TASKQ_CHANGING flag is set, and taskq_thread_create() is called to
 324  329   * create the first thread. taskq_thread_create() increments tq_active,
 325  330   * sets TASKQ_THREAD_CREATED, and creates the new thread.
 326  331   *
 327  332   * Each thread starts in taskq_thread(), clears the TASKQ_THREAD_CREATED
 328  333   * flag, and increments tq_nthreads.  It stores the new value of
 329  334   * tq_nthreads as its "thread_id", and stores its thread pointer in the
 330  335   * tq_threadlist at the (thread_id - 1).  We keep the thread_id space
 331  336   * densely packed by requiring that only the largest thread_id can exit during
 332  337   * normal adjustment.   The exception is during the destruction of the
 333  338   * taskq; once tq_nthreads_target is set to zero, no new threads will be created
 334  339   * for the taskq queue, so every thread can exit without any ordering being
 335  340   * necessary.
 336  341   *
 337  342   * Threads will only process work if their thread id is <= tq_nthreads_target.
 338  343   *
 339  344   * When TASKQ_CHANGING is set, threads will check the current thread target
 340  345   * whenever they wake up, and do whatever they can to apply its effects.
 341  346   *
 342  347   * TASKQ_THREAD_CPU_PCT --------------------------------------------------------
 343  348   *
 344  349   * When a taskq is created with TASKQ_THREAD_CPU_PCT, we store their requested
 345  350   * percentage in tq_threads_ncpus_pct, start them off with the correct thread
 346  351   * target, and add them to the taskq_cpupct_list for later adjustment.
 347  352   *
 348  353   * We register taskq_cpu_setup() to be called whenever a CPU changes state.  It
 349  354   * walks the list of TASKQ_THREAD_CPU_PCT taskqs, adjusts their nthread_target
 350  355   * if need be, and wakes up all of the threads to process the change.
 351  356   *
 352  357   * Dynamic Task Queues Implementation ------------------------------------------
 353  358   *
 354  359   * For a dynamic task queues there is a 1-to-1 mapping between a thread and
 355  360   * taskq_ent_structure. Each entry is serviced by its own thread and each thread
 356  361   * is controlled by a single entry.
 357  362   *
 358  363   * Entries are distributed over a set of buckets. To avoid using modulo
 359  364   * arithmetics the number of buckets is 2^n and is determined as the nearest
 360  365   * power of two roundown of the number of CPUs in the system. Tunable
 361  366   * variable 'taskq_maxbuckets' limits the maximum number of buckets. Each entry
 362  367   * is attached to a bucket for its lifetime and can't migrate to other buckets.
 363  368   *
 364  369   * Entries that have scheduled tasks are not placed in any list. The dispatch
 365  370   * function sets their "func" and "arg" fields and signals the corresponding
 366  371   * thread to execute the task. Once the thread executes the task it clears the
 367  372   * "func" field and places an entry on the bucket cache of free entries pointed
 368  373   * by "tqbucket_freelist" field. ALL entries on the free list should have "func"
 369  374   * field equal to NULL. The free list is a circular doubly-linked list identical
 370  375   * in structure to the tq_task list above, but entries are taken from it in LIFO
 371  376   * order - the last freed entry is the first to be allocated. The
 372  377   * taskq_bucket_dispatch() function gets the most recently used entry from the
 373  378   * free list, sets its "func" and "arg" fields and signals a worker thread.
 374  379   *
 375  380   * After executing each task a per-entry thread taskq_d_thread() places its
 376  381   * entry on the bucket free list and goes to a timed sleep. If it wakes up
 377  382   * without getting new task it removes the entry from the free list and destroys
 378  383   * itself. The thread sleep time is controlled by a tunable variable
 379  384   * `taskq_thread_timeout'.
 380  385   *
 381  386   * There are various statistics kept in the bucket which allows for later
 382  387   * analysis of taskq usage patterns. Also, a global copy of taskq creation and
 383  388   * death statistics is kept in the global taskq data structure. Since thread
 384  389   * creation and death happen rarely, updating such global data does not present
 385  390   * a performance problem.
 386  391   *
 387  392   * NOTE: Threads are not bound to any CPU and there is absolutely no association
 388  393   *       between the bucket and actual thread CPU, so buckets are used only to
 389  394   *       split resources and reduce resource contention. Having threads attached
 390  395   *       to the CPU denoted by a bucket may reduce number of times the job
 391  396   *       switches between CPUs.
 392  397   *
 393  398   *       Current algorithm creates a thread whenever a bucket has no free
 394  399   *       entries. It would be nice to know how many threads are in the running
 395  400   *       state and don't create threads if all CPUs are busy with existing
 396  401   *       tasks, but it is unclear how such strategy can be implemented.
 397  402   *
 398  403   *       Currently buckets are created statically as an array attached to task
 399  404   *       queue. On some system with nCPUs < max_ncpus it may waste system
 400  405   *       memory. One solution may be allocation of buckets when they are first
 401  406   *       touched, but it is not clear how useful it is.
 402  407   *
 403  408   * SUSPEND/RESUME implementation -----------------------------------------------
 404  409   *
 405  410   *      Before executing a task taskq_thread() (executing non-dynamic task
 406  411   *      queues) obtains taskq's thread lock as a reader. The taskq_suspend()
 407  412   *      function gets the same lock as a writer blocking all non-dynamic task
 408  413   *      execution. The taskq_resume() function releases the lock allowing
 409  414   *      taskq_thread to continue execution.
 410  415   *
 411  416   *      For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by
 412  417   *      taskq_suspend() function. After that taskq_bucket_dispatch() always
 413  418   *      fails, so that taskq_dispatch() will either enqueue tasks for a
 414  419   *      suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch
 415  420   *      flags.
 416  421   *
 417  422   *      NOTE: taskq_suspend() does not immediately block any tasks already
 418  423   *            scheduled for dynamic task queues. It only suspends new tasks
 419  424   *            scheduled after taskq_suspend() was called.
 420  425   *
 421  426   *      taskq_member() function works by comparing a thread t_taskq pointer with
 422  427   *      the passed thread pointer.
 423  428   *
 424  429   * LOCKS and LOCK Hierarchy ----------------------------------------------------
 425  430   *
 426  431   *   There are three locks used in task queues:
 427  432   *
 428  433   *   1) The taskq_t's tq_lock, protecting global task queue state.
 429  434   *
 430  435   *   2) Each per-CPU bucket has a lock for bucket management.
 431  436   *
 432  437   *   3) The global taskq_cpupct_lock, which protects the list of
 433  438   *      TASKQ_THREADS_CPU_PCT taskqs.
 434  439   *
 435  440   *   If both (1) and (2) are needed, tq_lock should be taken *after* the bucket
 436  441   *   lock.
 437  442   *
 438  443   *   If both (1) and (3) are needed, tq_lock should be taken *after*
 439  444   *   taskq_cpupct_lock.
 440  445   *
 441  446   * DEBUG FACILITIES ------------------------------------------------------------
 442  447   *
 443  448   * For DEBUG kernels it is possible to induce random failures to
 444  449   * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of
 445  450   * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced
 446  451   * failures for dynamic and static task queues respectively.
 447  452   *
 448  453   * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics.
 449  454   *
 450  455   * TUNABLES --------------------------------------------------------------------
 451  456   *
 452  457   *      system_taskq_size       - Size of the global system_taskq.
 453  458   *                                This value is multiplied by nCPUs to determine
 454  459   *                                actual size.
 455  460   *                                Default value: 64
 456  461   *
 457  462   *      taskq_minimum_nthreads_max
 458  463   *                              - Minimum size of the thread list for a taskq.
 459  464   *                                Useful for testing different thread pool
 460  465   *                                sizes by overwriting tq_nthreads_target.
 461  466   *
 462  467   *      taskq_thread_timeout    - Maximum idle time for taskq_d_thread()
 463  468   *                                Default value: 5 minutes
 464  469   *
 465  470   *      taskq_maxbuckets        - Maximum number of buckets in any task queue
 466  471   *                                Default value: 128
 467  472   *
 468  473   *      taskq_search_depth      - Maximum # of buckets searched for a free entry
 469  474   *                                Default value: 4
 470  475   *
 471  476   *      taskq_dmtbf             - Mean time between induced dispatch failures
 472  477   *                                for dynamic task queues.
 473  478   *                                Default value: UINT_MAX (no induced failures)
 474  479   *
 475  480   *      taskq_smtbf             - Mean time between induced dispatch failures
 476  481   *                                for static task queues.
 477  482   *                                Default value: UINT_MAX (no induced failures)
 478  483   *
 479  484   * CONDITIONAL compilation -----------------------------------------------------
 480  485   *
 481  486   *    TASKQ_STATISTIC   - If set will enable bucket statistic (default).
 482  487   *
 483  488   */
 484  489  
 485  490  #include <sys/taskq_impl.h>
 486  491  #include <sys/thread.h>
 487  492  #include <sys/proc.h>
 488  493  #include <sys/kmem.h>
 489  494  #include <sys/vmem.h>
 490  495  #include <sys/callb.h>
 491  496  #include <sys/class.h>
 492  497  #include <sys/systm.h>
 493  498  #include <sys/cmn_err.h>
 494  499  #include <sys/debug.h>
 495  500  #include <sys/vmsystm.h>        /* For throttlefree */
 496  501  #include <sys/sysmacros.h>
 497  502  #include <sys/cpuvar.h>
 498  503  #include <sys/cpupart.h>
 499  504  #include <sys/sdt.h>
 500  505  #include <sys/sysdc.h>
 501  506  #include <sys/note.h>
 502  507  
 503  508  static kmem_cache_t *taskq_ent_cache, *taskq_cache;
 504  509  
 505  510  /*
 506  511   * Pseudo instance numbers for taskqs without explicitly provided instance.
 507  512   */
 508  513  static vmem_t *taskq_id_arena;
 509  514  
 510  515  /* Global system task queue for common use */
 511  516  taskq_t *system_taskq;
 512  517  
 513  518  /*
 514  519   * Maximum number of entries in global system taskq is
 515  520   *      system_taskq_size * max_ncpus
 516  521   */
 517  522  #define SYSTEM_TASKQ_SIZE 64
 518  523  int system_taskq_size = SYSTEM_TASKQ_SIZE;
 519  524  
 520  525  /*
 521  526   * Minimum size for tq_nthreads_max; useful for those who want to play around
 522  527   * with increasing a taskq's tq_nthreads_target.
 523  528   */
 524  529  int taskq_minimum_nthreads_max = 1;
 525  530  
 526  531  /*
 527  532   * We want to ensure that when taskq_create() returns, there is at least
 528  533   * one thread ready to handle requests.  To guarantee this, we have to wait
 529  534   * for the second thread, since the first one cannot process requests until
 530  535   * the second thread has been created.
 531  536   */
 532  537  #define TASKQ_CREATE_ACTIVE_THREADS     2
 533  538  
 534  539  /* Maximum percentage allowed for TASKQ_THREADS_CPU_PCT */
 535  540  #define TASKQ_CPUPCT_MAX_PERCENT        1000
 536  541  int taskq_cpupct_max_percent = TASKQ_CPUPCT_MAX_PERCENT;
 537  542  
 538  543  /*
 539  544   * Dynamic task queue threads that don't get any work within
 540  545   * taskq_thread_timeout destroy themselves
 541  546   */
 542  547  #define TASKQ_THREAD_TIMEOUT (60 * 5)
 543  548  int taskq_thread_timeout = TASKQ_THREAD_TIMEOUT;
 544  549  
 545  550  #define TASKQ_MAXBUCKETS 128
 546  551  int taskq_maxbuckets = TASKQ_MAXBUCKETS;
 547  552  
 548  553  /*
 549  554   * When a bucket has no available entries another buckets are tried.
 550  555   * taskq_search_depth parameter limits the amount of buckets that we search
 551  556   * before failing. This is mostly useful in systems with many CPUs where we may
 552  557   * spend too much time scanning busy buckets.
 553  558   */
 554  559  #define TASKQ_SEARCH_DEPTH 4
 555  560  int taskq_search_depth = TASKQ_SEARCH_DEPTH;
 556  561  
 557  562  /*
 558  563   * Hashing function: mix various bits of x. May be pretty much anything.
 559  564   */
 560  565  #define TQ_HASH(x) ((x) ^ ((x) >> 11) ^ ((x) >> 17) ^ ((x) ^ 27))
 561  566  
 562  567  /*
 563  568   * We do not create any new threads when the system is low on memory and start
 564  569   * throttling memory allocations. The following macro tries to estimate such
 565  570   * condition.
 566  571   */
 567  572  #define ENOUGH_MEMORY() (freemem > throttlefree)
 568  573  
 569  574  /*
 570  575   * Static functions.
 571  576   */
 572  577  static taskq_t  *taskq_create_common(const char *, int, int, pri_t, int,
 573  578      int, proc_t *, uint_t, uint_t);
 574  579  static void taskq_thread(void *);
 575  580  static void taskq_d_thread(taskq_ent_t *);
 576  581  static void taskq_bucket_extend(void *);
 577  582  static int  taskq_constructor(void *, void *, int);
 578  583  static void taskq_destructor(void *, void *);
 579  584  static int  taskq_ent_constructor(void *, void *, int);
 580  585  static void taskq_ent_destructor(void *, void *);
 581  586  static taskq_ent_t *taskq_ent_alloc(taskq_t *, int);
 582  587  static void taskq_ent_free(taskq_t *, taskq_ent_t *);
 583  588  static int taskq_ent_exists(taskq_t *, task_func_t, void *);
 584  589  static taskq_ent_t *taskq_bucket_dispatch(taskq_bucket_t *, task_func_t,
 585  590      void *);
 586  591  
 587  592  /*
 588  593   * Task queues kstats.
 589  594   */
 590  595  struct taskq_kstat {
 591  596          kstat_named_t   tq_pid;
 592  597          kstat_named_t   tq_tasks;
 593  598          kstat_named_t   tq_executed;
 594  599          kstat_named_t   tq_maxtasks;
 595  600          kstat_named_t   tq_totaltime;
 596  601          kstat_named_t   tq_nalloc;
 597  602          kstat_named_t   tq_nactive;
 598  603          kstat_named_t   tq_pri;
 599  604          kstat_named_t   tq_nthreads;
 600  605          kstat_named_t   tq_nomem;
 601  606  } taskq_kstat = {
 602  607          { "pid",                KSTAT_DATA_UINT64 },
 603  608          { "tasks",              KSTAT_DATA_UINT64 },
 604  609          { "executed",           KSTAT_DATA_UINT64 },
 605  610          { "maxtasks",           KSTAT_DATA_UINT64 },
 606  611          { "totaltime",          KSTAT_DATA_UINT64 },
 607  612          { "nalloc",             KSTAT_DATA_UINT64 },
 608  613          { "nactive",            KSTAT_DATA_UINT64 },
 609  614          { "priority",           KSTAT_DATA_UINT64 },
 610  615          { "threads",            KSTAT_DATA_UINT64 },
 611  616          { "nomem",              KSTAT_DATA_UINT64 },
 612  617  };
 613  618  
 614  619  struct taskq_d_kstat {
 615  620          kstat_named_t   tqd_pri;
 616  621          kstat_named_t   tqd_btasks;
 617  622          kstat_named_t   tqd_bexecuted;
 618  623          kstat_named_t   tqd_bmaxtasks;
 619  624          kstat_named_t   tqd_bnalloc;
 620  625          kstat_named_t   tqd_bnactive;
 621  626          kstat_named_t   tqd_btotaltime;
 622  627          kstat_named_t   tqd_hits;
 623  628          kstat_named_t   tqd_misses;
 624  629          kstat_named_t   tqd_overflows;
 625  630          kstat_named_t   tqd_tcreates;
 626  631          kstat_named_t   tqd_tdeaths;
 627  632          kstat_named_t   tqd_maxthreads;
 628  633          kstat_named_t   tqd_nomem;
 629  634          kstat_named_t   tqd_disptcreates;
 630  635          kstat_named_t   tqd_totaltime;
 631  636          kstat_named_t   tqd_nalloc;
 632  637          kstat_named_t   tqd_nfree;
 633  638  } taskq_d_kstat = {
 634  639          { "priority",           KSTAT_DATA_UINT64 },
 635  640          { "btasks",             KSTAT_DATA_UINT64 },
 636  641          { "bexecuted",          KSTAT_DATA_UINT64 },
 637  642          { "bmaxtasks",          KSTAT_DATA_UINT64 },
 638  643          { "bnalloc",            KSTAT_DATA_UINT64 },
 639  644          { "bnactive",           KSTAT_DATA_UINT64 },
 640  645          { "btotaltime",         KSTAT_DATA_UINT64 },
 641  646          { "hits",               KSTAT_DATA_UINT64 },
 642  647          { "misses",             KSTAT_DATA_UINT64 },
 643  648          { "overflows",          KSTAT_DATA_UINT64 },
 644  649          { "tcreates",           KSTAT_DATA_UINT64 },
 645  650          { "tdeaths",            KSTAT_DATA_UINT64 },
 646  651          { "maxthreads",         KSTAT_DATA_UINT64 },
 647  652          { "nomem",              KSTAT_DATA_UINT64 },
 648  653          { "disptcreates",       KSTAT_DATA_UINT64 },
 649  654          { "totaltime",          KSTAT_DATA_UINT64 },
 650  655          { "nalloc",             KSTAT_DATA_UINT64 },
 651  656          { "nfree",              KSTAT_DATA_UINT64 },
 652  657  };
 653  658  
 654  659  static kmutex_t taskq_kstat_lock;
 655  660  static kmutex_t taskq_d_kstat_lock;
 656  661  static int taskq_kstat_update(kstat_t *, int);
 657  662  static int taskq_d_kstat_update(kstat_t *, int);
 658  663  
 659  664  /*
 660  665   * List of all TASKQ_THREADS_CPU_PCT taskqs.
 661  666   */
 662  667  static list_t taskq_cpupct_list;        /* protected by cpu_lock */
 663  668  
 664  669  /*
 665  670   * Collect per-bucket statistic when TASKQ_STATISTIC is defined.
 666  671   */
 667  672  #define TASKQ_STATISTIC 1
 668  673  
 669  674  #if TASKQ_STATISTIC
 670  675  #define TQ_STAT(b, x)   b->tqbucket_stat.x++
 671  676  #else
 672  677  #define TQ_STAT(b, x)
 673  678  #endif
 674  679  
 675  680  /*
 676  681   * Random fault injection.
 677  682   */
 678  683  uint_t taskq_random;
 679  684  uint_t taskq_dmtbf = UINT_MAX;    /* mean time between injected failures */
 680  685  uint_t taskq_smtbf = UINT_MAX;    /* mean time between injected failures */
 681  686  
 682  687  /*
 683  688   * TQ_NOSLEEP dispatches on dynamic task queues are always allowed to fail.
 684  689   *
 685  690   * TQ_NOSLEEP dispatches on static task queues can't arbitrarily fail because
 686  691   * they could prepopulate the cache and make sure that they do not use more
 687  692   * then minalloc entries.  So, fault injection in this case insures that
 688  693   * either TASKQ_PREPOPULATE is not set or there are more entries allocated
 689  694   * than is specified by minalloc.  TQ_NOALLOC dispatches are always allowed
 690  695   * to fail, but for simplicity we treat them identically to TQ_NOSLEEP
 691  696   * dispatches.
 692  697   */
 693  698  #ifdef DEBUG
 694  699  #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag)               \
 695  700          taskq_random = (taskq_random * 2416 + 374441) % 1771875;\
 696  701          if ((flag & TQ_NOSLEEP) &&                              \
 697  702              taskq_random < 1771875 / taskq_dmtbf) {             \
 698  703                  return (NULL);                                  \
 699  704          }
 700  705  
 701  706  #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag)               \
 702  707          taskq_random = (taskq_random * 2416 + 374441) % 1771875;\
 703  708          if ((flag & (TQ_NOSLEEP | TQ_NOALLOC)) &&               \
 704  709              (!(tq->tq_flags & TASKQ_PREPOPULATE) ||             \
 705  710              (tq->tq_nalloc > tq->tq_minalloc)) &&               \
 706  711              (taskq_random < (1771875 / taskq_smtbf))) {         \
 707  712                  mutex_exit(&tq->tq_lock);                       \
 708  713                  return (NULL);                                  \
 709  714          }
 710  715  #else
 711  716  #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag)
 712  717  #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag)
 713  718  #endif
 714  719  
 715  720  #define IS_EMPTY(l) (((l).tqent_prev == (l).tqent_next) &&      \
 716  721          ((l).tqent_prev == &(l)))
 717  722  
 718  723  /*
 719  724   * Append `tqe' in the end of the doubly-linked list denoted by l.
 720  725   */
 721  726  #define TQ_APPEND(l, tqe) {                                     \
 722  727          tqe->tqent_next = &l;                                   \
 723  728          tqe->tqent_prev = l.tqent_prev;                         \
 724  729          tqe->tqent_next->tqent_prev = tqe;                      \
 725  730          tqe->tqent_prev->tqent_next = tqe;                      \
 726  731  }
 727  732  /*
 728  733   * Prepend 'tqe' to the beginning of l
 729  734   */
 730  735  #define TQ_PREPEND(l, tqe) {                                    \
 731  736          tqe->tqent_next = l.tqent_next;                         \
 732  737          tqe->tqent_prev = &l;                                   \
 733  738          tqe->tqent_next->tqent_prev = tqe;                      \
 734  739          tqe->tqent_prev->tqent_next = tqe;                      \
 735  740  }
 736  741  
 737  742  /*
 738  743   * Schedule a task specified by func and arg into the task queue entry tqe.
 739  744   */
 740  745  #define TQ_DO_ENQUEUE(tq, tqe, func, arg, front) {                      \
 741  746          ASSERT(MUTEX_HELD(&tq->tq_lock));                               \
 742  747          _NOTE(CONSTCOND)                                                \
 743  748          if (front) {                                                    \
 744  749                  TQ_PREPEND(tq->tq_task, tqe);                           \
 745  750          } else {                                                        \
 746  751                  TQ_APPEND(tq->tq_task, tqe);                            \
 747  752          }                                                               \
 748  753          tqe->tqent_func = (func);                                       \
 749  754          tqe->tqent_arg = (arg);                                         \
 750  755          tq->tq_tasks++;                                                 \
 751  756          if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks)           \
 752  757                  tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed;       \
 753  758          cv_signal(&tq->tq_dispatch_cv);                                 \
 754  759          DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \
 755  760  }
 756  761  
 757  762  #define TQ_ENQUEUE(tq, tqe, func, arg)                                  \
 758  763          TQ_DO_ENQUEUE(tq, tqe, func, arg, 0)
 759  764  
 760  765  #define TQ_ENQUEUE_FRONT(tq, tqe, func, arg)                            \
 761  766          TQ_DO_ENQUEUE(tq, tqe, func, arg, 1)
 762  767  
 763  768  /*
 764  769   * Do-nothing task which may be used to prepopulate thread caches.
 765  770   */
 766  771  /*ARGSUSED*/
 767  772  void
 768  773  nulltask(void *unused)
 769  774  {
 770  775  }
 771  776  
 772  777  /*ARGSUSED*/
 773  778  static int
 774  779  taskq_constructor(void *buf, void *cdrarg, int kmflags)
 775  780  {
 776  781          taskq_t *tq = buf;
 777  782  
 778  783          bzero(tq, sizeof (taskq_t));
 779  784  
 780  785          mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
 781  786          rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
 782  787          cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
 783  788          cv_init(&tq->tq_exit_cv, NULL, CV_DEFAULT, NULL);
 784  789          cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
 785  790          cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
 786  791  
 787  792          tq->tq_task.tqent_next = &tq->tq_task;
 788  793          tq->tq_task.tqent_prev = &tq->tq_task;
 789  794  
 790  795          return (0);
 791  796  }
 792  797  
 793  798  /*ARGSUSED*/
 794  799  static void
 795  800  taskq_destructor(void *buf, void *cdrarg)
 796  801  {
 797  802          taskq_t *tq = buf;
 798  803  
 799  804          ASSERT(tq->tq_nthreads == 0);
 800  805          ASSERT(tq->tq_buckets == NULL);
 801  806          ASSERT(tq->tq_tcreates == 0);
 802  807          ASSERT(tq->tq_tdeaths == 0);
 803  808  
 804  809          mutex_destroy(&tq->tq_lock);
 805  810          rw_destroy(&tq->tq_threadlock);
 806  811          cv_destroy(&tq->tq_dispatch_cv);
 807  812          cv_destroy(&tq->tq_exit_cv);
 808  813          cv_destroy(&tq->tq_wait_cv);
 809  814          cv_destroy(&tq->tq_maxalloc_cv);
 810  815  }
 811  816  
 812  817  /*ARGSUSED*/
 813  818  static int
 814  819  taskq_ent_constructor(void *buf, void *cdrarg, int kmflags)
 815  820  {
 816  821          taskq_ent_t *tqe = buf;
 817  822  
 818  823          tqe->tqent_thread = NULL;
 819  824          cv_init(&tqe->tqent_cv, NULL, CV_DEFAULT, NULL);
 820  825  
 821  826          return (0);
 822  827  }
 823  828  
 824  829  /*ARGSUSED*/
 825  830  static void
 826  831  taskq_ent_destructor(void *buf, void *cdrarg)
 827  832  {
 828  833          taskq_ent_t *tqe = buf;
 829  834  
 830  835          ASSERT(tqe->tqent_thread == NULL);
 831  836          cv_destroy(&tqe->tqent_cv);
 832  837  }
 833  838  
 834  839  void
 835  840  taskq_init(void)
 836  841  {
 837  842          taskq_ent_cache = kmem_cache_create("taskq_ent_cache",
 838  843              sizeof (taskq_ent_t), 0, taskq_ent_constructor,
 839  844              taskq_ent_destructor, NULL, NULL, NULL, 0);
 840  845          taskq_cache = kmem_cache_create("taskq_cache", sizeof (taskq_t),
 841  846              0, taskq_constructor, taskq_destructor, NULL, NULL, NULL, 0);
 842  847          taskq_id_arena = vmem_create("taskq_id_arena",
 843  848              (void *)1, INT32_MAX, 1, NULL, NULL, NULL, 0,
 844  849              VM_SLEEP | VMC_IDENTIFIER);
 845  850  
 846  851          list_create(&taskq_cpupct_list, sizeof (taskq_t),
 847  852              offsetof(taskq_t, tq_cpupct_link));
 848  853  }
 849  854  
 850  855  static void
 851  856  taskq_update_nthreads(taskq_t *tq, uint_t ncpus)
 852  857  {
 853  858          uint_t newtarget = TASKQ_THREADS_PCT(ncpus, tq->tq_threads_ncpus_pct);
 854  859  
 855  860          ASSERT(MUTEX_HELD(&cpu_lock));
 856  861          ASSERT(MUTEX_HELD(&tq->tq_lock));
 857  862  
 858  863          /* We must be going from non-zero to non-zero; no exiting. */
 859  864          ASSERT3U(tq->tq_nthreads_target, !=, 0);
 860  865          ASSERT3U(newtarget, !=, 0);
 861  866  
 862  867          ASSERT3U(newtarget, <=, tq->tq_nthreads_max);
 863  868          if (newtarget != tq->tq_nthreads_target) {
 864  869                  tq->tq_flags |= TASKQ_CHANGING;
 865  870                  tq->tq_nthreads_target = newtarget;
 866  871                  cv_broadcast(&tq->tq_dispatch_cv);
 867  872                  cv_broadcast(&tq->tq_exit_cv);
 868  873          }
 869  874  }
 870  875  
 871  876  /* called during task queue creation */
 872  877  static void
 873  878  taskq_cpupct_install(taskq_t *tq, cpupart_t *cpup)
 874  879  {
 875  880          ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
 876  881  
 877  882          mutex_enter(&cpu_lock);
 878  883          mutex_enter(&tq->tq_lock);
 879  884          tq->tq_cpupart = cpup->cp_id;
 880  885          taskq_update_nthreads(tq, cpup->cp_ncpus);
 881  886          mutex_exit(&tq->tq_lock);
 882  887  
 883  888          list_insert_tail(&taskq_cpupct_list, tq);
 884  889          mutex_exit(&cpu_lock);
 885  890  }
 886  891  
 887  892  static void
 888  893  taskq_cpupct_remove(taskq_t *tq)
 889  894  {
 890  895          ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
 891  896  
 892  897          mutex_enter(&cpu_lock);
 893  898          list_remove(&taskq_cpupct_list, tq);
 894  899          mutex_exit(&cpu_lock);
 895  900  }
 896  901  
 897  902  /*ARGSUSED*/
 898  903  static int
 899  904  taskq_cpu_setup(cpu_setup_t what, int id, void *arg)
 900  905  {
 901  906          taskq_t *tq;
 902  907          cpupart_t *cp = cpu[id]->cpu_part;
 903  908          uint_t ncpus = cp->cp_ncpus;
 904  909  
 905  910          ASSERT(MUTEX_HELD(&cpu_lock));
 906  911          ASSERT(ncpus > 0);
 907  912  
 908  913          switch (what) {
 909  914          case CPU_OFF:
 910  915          case CPU_CPUPART_OUT:
 911  916                  /* offlines are called *before* the cpu is offlined. */
 912  917                  if (ncpus > 1)
 913  918                          ncpus--;
 914  919                  break;
 915  920  
 916  921          case CPU_ON:
 917  922          case CPU_CPUPART_IN:
 918  923                  break;
 919  924  
 920  925          default:
 921  926                  return (0);             /* doesn't affect cpu count */
 922  927          }
 923  928  
 924  929          for (tq = list_head(&taskq_cpupct_list); tq != NULL;
 925  930              tq = list_next(&taskq_cpupct_list, tq)) {
 926  931  
 927  932                  mutex_enter(&tq->tq_lock);
 928  933                  /*
 929  934                   * If the taskq is part of the cpuset which is changing,
 930  935                   * update its nthreads_target.
 931  936                   */
 932  937                  if (tq->tq_cpupart == cp->cp_id) {
 933  938                          taskq_update_nthreads(tq, ncpus);
 934  939                  }
 935  940                  mutex_exit(&tq->tq_lock);
 936  941          }
 937  942          return (0);
 938  943  }
 939  944  
 940  945  void
 941  946  taskq_mp_init(void)
 942  947  {
 943  948          mutex_enter(&cpu_lock);
 944  949          register_cpu_setup_func(taskq_cpu_setup, NULL);
 945  950          /*
 946  951           * Make sure we're up to date.  At this point in boot, there is only
 947  952           * one processor set, so we only have to update the current CPU.
 948  953           */
 949  954          (void) taskq_cpu_setup(CPU_ON, CPU->cpu_id, NULL);
 950  955          mutex_exit(&cpu_lock);
 951  956  }
 952  957  
 953  958  /*
 954  959   * Create global system dynamic task queue.
 955  960   */
 956  961  void
 957  962  system_taskq_init(void)
 958  963  {
 959  964          system_taskq = taskq_create_common("system_taskq", 0,
 960  965              system_taskq_size * max_ncpus, minclsyspri, 4, 512, &p0, 0,
 961  966              TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
 962  967  }
 963  968  
 964  969  /*
 965  970   * taskq_ent_alloc()
 966  971   *
 967  972   * Allocates a new taskq_ent_t structure either from the free list or from the
 968  973   * cache. Returns NULL if it can't be allocated.
 969  974   *
 970  975   * Assumes: tq->tq_lock is held.
 971  976   */
 972  977  static taskq_ent_t *
 973  978  taskq_ent_alloc(taskq_t *tq, int flags)
 974  979  {
 975  980          int kmflags = (flags & TQ_NOSLEEP) ? KM_NOSLEEP : KM_SLEEP;
 976  981          taskq_ent_t *tqe;
 977  982          clock_t wait_time;
 978  983          clock_t wait_rv;
 979  984  
 980  985          ASSERT(MUTEX_HELD(&tq->tq_lock));
 981  986  
 982  987          /*
 983  988           * TQ_NOALLOC allocations are allowed to use the freelist, even if
 984  989           * we are below tq_minalloc.
 985  990           */
 986  991  again:  if ((tqe = tq->tq_freelist) != NULL &&
 987  992              ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) {
 988  993                  tq->tq_freelist = tqe->tqent_next;
 989  994          } else {
 990  995                  if (flags & TQ_NOALLOC)
 991  996                          return (NULL);
 992  997  
 993  998                  if (tq->tq_nalloc >= tq->tq_maxalloc) {
 994  999                          if (kmflags & KM_NOSLEEP)
 995 1000                                  return (NULL);
 996 1001  
 997 1002                          /*
 998 1003                           * We don't want to exceed tq_maxalloc, but we can't
 999 1004                           * wait for other tasks to complete (and thus free up
1000 1005                           * task structures) without risking deadlock with
1001 1006                           * the caller.  So, we just delay for one second
1002 1007                           * to throttle the allocation rate. If we have tasks
1003 1008                           * complete before one second timeout expires then
1004 1009                           * taskq_ent_free will signal us and we will
1005 1010                           * immediately retry the allocation (reap free).
1006 1011                           */
1007 1012                          wait_time = ddi_get_lbolt() + hz;
1008 1013                          while (tq->tq_freelist == NULL) {
1009 1014                                  tq->tq_maxalloc_wait++;
1010 1015                                  wait_rv = cv_timedwait(&tq->tq_maxalloc_cv,
1011 1016                                      &tq->tq_lock, wait_time);
1012 1017                                  tq->tq_maxalloc_wait--;
1013 1018                                  if (wait_rv == -1)
1014 1019                                          break;
1015 1020                          }
1016 1021                          if (tq->tq_freelist)
1017 1022                                  goto again;             /* reap freelist */
1018 1023  
1019 1024                  }
1020 1025                  mutex_exit(&tq->tq_lock);
1021 1026  
1022 1027                  tqe = kmem_cache_alloc(taskq_ent_cache, kmflags);
1023 1028  
1024 1029                  mutex_enter(&tq->tq_lock);
1025 1030                  if (tqe != NULL)
1026 1031                          tq->tq_nalloc++;
1027 1032          }
1028 1033          return (tqe);
1029 1034  }
1030 1035  
1031 1036  /*
1032 1037   * taskq_ent_free()
1033 1038   *
1034 1039   * Free taskq_ent_t structure by either putting it on the free list or freeing
1035 1040   * it to the cache.
1036 1041   *
1037 1042   * Assumes: tq->tq_lock is held.
1038 1043   */
1039 1044  static void
1040 1045  taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe)
1041 1046  {
1042 1047          ASSERT(MUTEX_HELD(&tq->tq_lock));
1043 1048  
1044 1049          if (tq->tq_nalloc <= tq->tq_minalloc) {
1045 1050                  tqe->tqent_next = tq->tq_freelist;
1046 1051                  tq->tq_freelist = tqe;
1047 1052          } else {
1048 1053                  tq->tq_nalloc--;
1049 1054                  mutex_exit(&tq->tq_lock);
1050 1055                  kmem_cache_free(taskq_ent_cache, tqe);
1051 1056                  mutex_enter(&tq->tq_lock);
1052 1057          }
1053 1058  
1054 1059          if (tq->tq_maxalloc_wait)
1055 1060                  cv_signal(&tq->tq_maxalloc_cv);
1056 1061  }
1057 1062  
1058 1063  /*
1059 1064   * taskq_ent_exists()
1060 1065   *
1061 1066   * Return 1 if taskq already has entry for calling 'func(arg)'.
1062 1067   *
1063 1068   * Assumes: tq->tq_lock is held.
1064 1069   */
1065 1070  static int
1066 1071  taskq_ent_exists(taskq_t *tq, task_func_t func, void *arg)
1067 1072  {
1068 1073          taskq_ent_t     *tqe;
1069 1074  
1070 1075          ASSERT(MUTEX_HELD(&tq->tq_lock));
1071 1076  
1072 1077          for (tqe = tq->tq_task.tqent_next; tqe != &tq->tq_task;
1073 1078              tqe = tqe->tqent_next)
1074 1079                  if ((tqe->tqent_func == func) && (tqe->tqent_arg == arg))
1075 1080                          return (1);
1076 1081          return (0);
1077 1082  }
1078 1083  
1079 1084  /*
1080 1085   * Dispatch a task "func(arg)" to a free entry of bucket b.
1081 1086   *
1082 1087   * Assumes: no bucket locks is held.
1083 1088   *
1084 1089   * Returns: a pointer to an entry if dispatch was successful.
1085 1090   *          NULL if there are no free entries or if the bucket is suspended.
1086 1091   */
1087 1092  static taskq_ent_t *
1088 1093  taskq_bucket_dispatch(taskq_bucket_t *b, task_func_t func, void *arg)
1089 1094  {
1090 1095          taskq_ent_t *tqe;
1091 1096  
1092 1097          ASSERT(MUTEX_NOT_HELD(&b->tqbucket_lock));
1093 1098          ASSERT(func != NULL);
1094 1099  
1095 1100          mutex_enter(&b->tqbucket_lock);
1096 1101  
1097 1102          ASSERT(b->tqbucket_nfree != 0 || IS_EMPTY(b->tqbucket_freelist));
1098 1103          ASSERT(b->tqbucket_nfree == 0 || !IS_EMPTY(b->tqbucket_freelist));
1099 1104  
1100 1105          /*
1101 1106           * Get en entry from the freelist if there is one.
1102 1107           * Schedule task into the entry.
1103 1108           */
1104 1109          if ((b->tqbucket_nfree != 0) &&
1105 1110              !(b->tqbucket_flags & TQBUCKET_SUSPEND)) {
1106 1111                  tqe = b->tqbucket_freelist.tqent_prev;
1107 1112  
1108 1113                  ASSERT(tqe != &b->tqbucket_freelist);
1109 1114                  ASSERT(tqe->tqent_thread != NULL);
1110 1115  
1111 1116                  tqe->tqent_prev->tqent_next = tqe->tqent_next;
1112 1117                  tqe->tqent_next->tqent_prev = tqe->tqent_prev;
1113 1118                  b->tqbucket_nalloc++;
1114 1119                  b->tqbucket_nfree--;
1115 1120                  tqe->tqent_func = func;
1116 1121                  tqe->tqent_arg = arg;
1117 1122                  TQ_STAT(b, tqs_hits);
1118 1123                  cv_signal(&tqe->tqent_cv);
1119 1124                  DTRACE_PROBE2(taskq__d__enqueue, taskq_bucket_t *, b,
1120 1125                      taskq_ent_t *, tqe);
1121 1126          } else {
1122 1127                  tqe = NULL;
1123 1128                  TQ_STAT(b, tqs_misses);
1124 1129          }
1125 1130          mutex_exit(&b->tqbucket_lock);
1126 1131          return (tqe);
1127 1132  }
1128 1133  
1129 1134  /*
1130 1135   * Dispatch a task.
1131 1136   *
1132 1137   * Assumes: func != NULL
1133 1138   *
1134 1139   * Returns: NULL if dispatch failed.
1135 1140   *          non-NULL if task dispatched successfully.
1136 1141   *          Actual return value is the pointer to taskq entry that was used to
1137 1142   *          dispatch a task. This is useful for debugging.
1138 1143   */
1139 1144  taskqid_t
1140 1145  taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
1141 1146  {
1142 1147          taskq_bucket_t *bucket = NULL;  /* Which bucket needs extension */
1143 1148          taskq_ent_t *tqe = NULL;
1144 1149          taskq_ent_t *tqe1;
1145 1150          uint_t bsize;
1146 1151  
1147 1152          ASSERT(tq != NULL);
1148 1153          ASSERT(func != NULL);
1149 1154  
1150 1155          if (!(tq->tq_flags & TASKQ_DYNAMIC)) {
1151 1156                  /*
1152 1157                   * TQ_NOQUEUE flag can't be used with non-dynamic task queues.
1153 1158                   */
1154 1159                  ASSERT(!(flags & TQ_NOQUEUE));
1155 1160                  /*
1156 1161                   * Enqueue the task to the underlying queue.
1157 1162                   */
1158 1163                  mutex_enter(&tq->tq_lock);
1159 1164  
1160 1165                  TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags);
1161 1166  
1162 1167                  if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) {
1163 1168                          tq->tq_nomem++;
1164 1169                          mutex_exit(&tq->tq_lock);
1165 1170                          return (NULL);
1166 1171                  }
1167 1172                  /* Make sure we start without any flags */
1168 1173                  tqe->tqent_un.tqent_flags = 0;
1169 1174  
1170 1175                  if (flags & TQ_FRONT) {
1171 1176                          TQ_ENQUEUE_FRONT(tq, tqe, func, arg);
1172 1177                  } else {
1173 1178                          TQ_ENQUEUE(tq, tqe, func, arg);
1174 1179                  }
1175 1180                  mutex_exit(&tq->tq_lock);
1176 1181                  return ((taskqid_t)tqe);
1177 1182          }
1178 1183  
1179 1184          /*
1180 1185           * Dynamic taskq dispatching.
1181 1186           */
1182 1187          ASSERT(!(flags & (TQ_NOALLOC | TQ_FRONT)));
1183 1188          TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flags);
1184 1189  
1185 1190          bsize = tq->tq_nbuckets;
1186 1191  
1187 1192          if (bsize == 1) {
1188 1193                  /*
1189 1194                   * In a single-CPU case there is only one bucket, so get
1190 1195                   * entry directly from there.
1191 1196                   */
1192 1197                  if ((tqe = taskq_bucket_dispatch(tq->tq_buckets, func, arg))
1193 1198                      != NULL)
1194 1199                          return ((taskqid_t)tqe);        /* Fastpath */
1195 1200                  bucket = tq->tq_buckets;
1196 1201          } else {
1197 1202                  int loopcount;
1198 1203                  taskq_bucket_t *b;
1199 1204                  uintptr_t h = ((uintptr_t)CPU + (uintptr_t)arg) >> 3;
1200 1205  
1201 1206                  h = TQ_HASH(h);
1202 1207  
1203 1208                  /*
1204 1209                   * The 'bucket' points to the original bucket that we hit. If we
1205 1210                   * can't allocate from it, we search other buckets, but only
1206 1211                   * extend this one.
1207 1212                   */
1208 1213                  b = &tq->tq_buckets[h & (bsize - 1)];
1209 1214                  ASSERT(b->tqbucket_taskq == tq);        /* Sanity check */
1210 1215  
1211 1216                  /*
1212 1217                   * Do a quick check before grabbing the lock. If the bucket does
1213 1218                   * not have free entries now, chances are very small that it
1214 1219                   * will after we take the lock, so we just skip it.
1215 1220                   */
1216 1221                  if (b->tqbucket_nfree != 0) {
1217 1222                          if ((tqe = taskq_bucket_dispatch(b, func, arg)) != NULL)
1218 1223                                  return ((taskqid_t)tqe);        /* Fastpath */
1219 1224                  } else {
1220 1225                          TQ_STAT(b, tqs_misses);
1221 1226                  }
1222 1227  
1223 1228                  bucket = b;
1224 1229                  loopcount = MIN(taskq_search_depth, bsize);
1225 1230                  /*
1226 1231                   * If bucket dispatch failed, search loopcount number of buckets
1227 1232                   * before we give up and fail.
1228 1233                   */
1229 1234                  do {
1230 1235                          b = &tq->tq_buckets[++h & (bsize - 1)];
1231 1236                          ASSERT(b->tqbucket_taskq == tq);  /* Sanity check */
1232 1237                          loopcount--;
1233 1238  
1234 1239                          if (b->tqbucket_nfree != 0) {
1235 1240                                  tqe = taskq_bucket_dispatch(b, func, arg);
1236 1241                          } else {
1237 1242                                  TQ_STAT(b, tqs_misses);
1238 1243                          }
1239 1244                  } while ((tqe == NULL) && (loopcount > 0));
1240 1245          }
1241 1246  
1242 1247          /*
1243 1248           * At this point we either scheduled a task and (tqe != NULL) or failed
1244 1249           * (tqe == NULL). Try to recover from fails.
1245 1250           */
1246 1251  
1247 1252          /*
1248 1253           * For KM_SLEEP dispatches, try to extend the bucket and retry dispatch.
1249 1254           */
1250 1255          if ((tqe == NULL) && !(flags & TQ_NOSLEEP)) {
1251 1256                  /*
1252 1257                   * taskq_bucket_extend() may fail to do anything, but this is
1253 1258                   * fine - we deal with it later. If the bucket was successfully
1254 1259                   * extended, there is a good chance that taskq_bucket_dispatch()
1255 1260                   * will get this new entry, unless someone is racing with us and
1256 1261                   * stealing the new entry from under our nose.
1257 1262                   * taskq_bucket_extend() may sleep.
1258 1263                   */
1259 1264                  taskq_bucket_extend(bucket);
1260 1265                  TQ_STAT(bucket, tqs_disptcreates);
1261 1266                  if ((tqe = taskq_bucket_dispatch(bucket, func, arg)) != NULL)
1262 1267                          return ((taskqid_t)tqe);
1263 1268          }
1264 1269  
1265 1270          ASSERT(bucket != NULL);
1266 1271  
1267 1272          /*
1268 1273           * Since there are not enough free entries in the bucket, add a
1269 1274           * taskq entry to extend it in the background using backing queue
1270 1275           * (unless we already have a taskq entry to perform that extension).
1271 1276           */
1272 1277          mutex_enter(&tq->tq_lock);
1273 1278          if (!taskq_ent_exists(tq, taskq_bucket_extend, bucket)) {
1274 1279                  if ((tqe1 = taskq_ent_alloc(tq, TQ_NOSLEEP)) != NULL) {
1275 1280                          TQ_ENQUEUE_FRONT(tq, tqe1, taskq_bucket_extend, bucket);
1276 1281                  } else {
1277 1282                          tq->tq_nomem++;
1278 1283                  }
1279 1284          }
1280 1285  
1281 1286          /*
1282 1287           * Dispatch failed and we can't find an entry to schedule a task.
1283 1288           * Revert to the backing queue unless TQ_NOQUEUE was asked.
1284 1289           */
1285 1290          if ((tqe == NULL) && !(flags & TQ_NOQUEUE)) {
1286 1291                  if ((tqe = taskq_ent_alloc(tq, flags)) != NULL) {
1287 1292                          TQ_ENQUEUE(tq, tqe, func, arg);
1288 1293                  } else {
1289 1294                          tq->tq_nomem++;
1290 1295                  }
1291 1296          }
1292 1297          mutex_exit(&tq->tq_lock);
1293 1298  
1294 1299          return ((taskqid_t)tqe);
1295 1300  }
1296 1301  
1297 1302  void
1298 1303  taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
1299 1304      taskq_ent_t *tqe)
1300 1305  {
1301 1306          ASSERT(func != NULL);
1302 1307          ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
1303 1308  
1304 1309          /*
1305 1310           * Mark it as a prealloc'd task.  This is important
1306 1311           * to ensure that we don't free it later.
1307 1312           */
1308 1313          tqe->tqent_un.tqent_flags |= TQENT_FLAG_PREALLOC;
1309 1314          /*
1310 1315           * Enqueue the task to the underlying queue.
1311 1316           */
  
    | 
      ↓ open down ↓ | 
    1099 lines elided | 
    
      ↑ open up ↑ | 
  
1312 1317          mutex_enter(&tq->tq_lock);
1313 1318  
1314 1319          if (flags & TQ_FRONT) {
1315 1320                  TQ_ENQUEUE_FRONT(tq, tqe, func, arg);
1316 1321          } else {
1317 1322                  TQ_ENQUEUE(tq, tqe, func, arg);
1318 1323          }
1319 1324          mutex_exit(&tq->tq_lock);
1320 1325  }
1321 1326  
     1327 +/*
     1328 + * Allow our caller to ask if there are tasks pending on the queue.
     1329 + */
     1330 +boolean_t
     1331 +taskq_empty(taskq_t *tq)
     1332 +{
     1333 +        boolean_t rv;
     1334 +
     1335 +        ASSERT3P(tq, !=, curthread->t_taskq);
     1336 +        mutex_enter(&tq->tq_lock);
     1337 +        rv = (tq->tq_task.tqent_next == &tq->tq_task) && (tq->tq_active == 0);
     1338 +        mutex_exit(&tq->tq_lock);
     1339 +
     1340 +        return (rv);
     1341 +}
     1342 +
1322 1343  /*
1323 1344   * Wait for all pending tasks to complete.
1324 1345   * Calling taskq_wait from a task will cause deadlock.
1325 1346   */
1326 1347  void
1327 1348  taskq_wait(taskq_t *tq)
1328 1349  {
1329 1350          ASSERT(tq != curthread->t_taskq);
1330 1351  
1331 1352          mutex_enter(&tq->tq_lock);
1332 1353          while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
1333 1354                  cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
1334 1355          mutex_exit(&tq->tq_lock);
1335 1356  
1336 1357          if (tq->tq_flags & TASKQ_DYNAMIC) {
1337 1358                  taskq_bucket_t *b = tq->tq_buckets;
1338 1359                  int bid = 0;
1339 1360                  for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
1340 1361                          mutex_enter(&b->tqbucket_lock);
1341 1362                          while (b->tqbucket_nalloc > 0)
1342 1363                                  cv_wait(&b->tqbucket_cv, &b->tqbucket_lock);
1343 1364                          mutex_exit(&b->tqbucket_lock);
1344 1365                  }
1345 1366          }
1346 1367  }
1347 1368  
1348 1369  /*
1349 1370   * Suspend execution of tasks.
1350 1371   *
1351 1372   * Tasks in the queue part will be suspended immediately upon return from this
1352 1373   * function. Pending tasks in the dynamic part will continue to execute, but all
1353 1374   * new tasks will  be suspended.
1354 1375   */
1355 1376  void
1356 1377  taskq_suspend(taskq_t *tq)
1357 1378  {
1358 1379          rw_enter(&tq->tq_threadlock, RW_WRITER);
1359 1380  
1360 1381          if (tq->tq_flags & TASKQ_DYNAMIC) {
1361 1382                  taskq_bucket_t *b = tq->tq_buckets;
1362 1383                  int bid = 0;
1363 1384                  for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
1364 1385                          mutex_enter(&b->tqbucket_lock);
1365 1386                          b->tqbucket_flags |= TQBUCKET_SUSPEND;
1366 1387                          mutex_exit(&b->tqbucket_lock);
1367 1388                  }
1368 1389          }
1369 1390          /*
1370 1391           * Mark task queue as being suspended. Needed for taskq_suspended().
1371 1392           */
1372 1393          mutex_enter(&tq->tq_lock);
1373 1394          ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED));
1374 1395          tq->tq_flags |= TASKQ_SUSPENDED;
1375 1396          mutex_exit(&tq->tq_lock);
1376 1397  }
1377 1398  
1378 1399  /*
1379 1400   * returns: 1 if tq is suspended, 0 otherwise.
1380 1401   */
1381 1402  int
1382 1403  taskq_suspended(taskq_t *tq)
1383 1404  {
1384 1405          return ((tq->tq_flags & TASKQ_SUSPENDED) != 0);
1385 1406  }
1386 1407  
1387 1408  /*
1388 1409   * Resume taskq execution.
1389 1410   */
1390 1411  void
1391 1412  taskq_resume(taskq_t *tq)
1392 1413  {
1393 1414          ASSERT(RW_WRITE_HELD(&tq->tq_threadlock));
1394 1415  
1395 1416          if (tq->tq_flags & TASKQ_DYNAMIC) {
1396 1417                  taskq_bucket_t *b = tq->tq_buckets;
1397 1418                  int bid = 0;
1398 1419                  for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
1399 1420                          mutex_enter(&b->tqbucket_lock);
1400 1421                          b->tqbucket_flags &= ~TQBUCKET_SUSPEND;
1401 1422                          mutex_exit(&b->tqbucket_lock);
1402 1423                  }
1403 1424          }
1404 1425          mutex_enter(&tq->tq_lock);
1405 1426          ASSERT(tq->tq_flags & TASKQ_SUSPENDED);
1406 1427          tq->tq_flags &= ~TASKQ_SUSPENDED;
1407 1428          mutex_exit(&tq->tq_lock);
1408 1429  
1409 1430          rw_exit(&tq->tq_threadlock);
1410 1431  }
1411 1432  
1412 1433  int
1413 1434  taskq_member(taskq_t *tq, kthread_t *thread)
1414 1435  {
1415 1436          return (thread->t_taskq == tq);
1416 1437  }
1417 1438  
1418 1439  /*
1419 1440   * Creates a thread in the taskq.  We only allow one outstanding create at
1420 1441   * a time.  We drop and reacquire the tq_lock in order to avoid blocking other
1421 1442   * taskq activity while thread_create() or lwp_kernel_create() run.
1422 1443   *
1423 1444   * The first time we're called, we do some additional setup, and do not
1424 1445   * return until there are enough threads to start servicing requests.
1425 1446   */
1426 1447  static void
1427 1448  taskq_thread_create(taskq_t *tq)
1428 1449  {
1429 1450          kthread_t       *t;
1430 1451          const boolean_t first = (tq->tq_nthreads == 0);
1431 1452  
1432 1453          ASSERT(MUTEX_HELD(&tq->tq_lock));
1433 1454          ASSERT(tq->tq_flags & TASKQ_CHANGING);
1434 1455          ASSERT(tq->tq_nthreads < tq->tq_nthreads_target);
1435 1456          ASSERT(!(tq->tq_flags & TASKQ_THREAD_CREATED));
1436 1457  
1437 1458  
1438 1459          tq->tq_flags |= TASKQ_THREAD_CREATED;
1439 1460          tq->tq_active++;
1440 1461          mutex_exit(&tq->tq_lock);
1441 1462  
1442 1463          /*
1443 1464           * With TASKQ_DUTY_CYCLE the new thread must have an LWP
1444 1465           * as explained in ../disp/sysdc.c (for the msacct data).
1445 1466           * Otherwise simple kthreads are preferred.
1446 1467           */
1447 1468          if ((tq->tq_flags & TASKQ_DUTY_CYCLE) != 0) {
1448 1469                  /* Enforced in taskq_create_common */
1449 1470                  ASSERT3P(tq->tq_proc, !=, &p0);
1450 1471                  t = lwp_kernel_create(tq->tq_proc, taskq_thread, tq, TS_RUN,
1451 1472                      tq->tq_pri);
1452 1473          } else {
1453 1474                  t = thread_create(NULL, 0, taskq_thread, tq, 0, tq->tq_proc,
1454 1475                      TS_RUN, tq->tq_pri);
1455 1476          }
1456 1477  
1457 1478          if (!first) {
1458 1479                  mutex_enter(&tq->tq_lock);
1459 1480                  return;
1460 1481          }
1461 1482  
1462 1483          /*
1463 1484           * We know the thread cannot go away, since tq cannot be
1464 1485           * destroyed until creation has completed.  We can therefore
1465 1486           * safely dereference t.
1466 1487           */
1467 1488          if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) {
1468 1489                  taskq_cpupct_install(tq, t->t_cpupart);
1469 1490          }
1470 1491          mutex_enter(&tq->tq_lock);
1471 1492  
1472 1493          /* Wait until we can service requests. */
1473 1494          while (tq->tq_nthreads != tq->tq_nthreads_target &&
1474 1495              tq->tq_nthreads < TASKQ_CREATE_ACTIVE_THREADS) {
1475 1496                  cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
1476 1497          }
1477 1498  }
1478 1499  
1479 1500  /*
1480 1501   * Common "sleep taskq thread" function, which handles CPR stuff, as well
1481 1502   * as giving a nice common point for debuggers to find inactive threads.
1482 1503   */
1483 1504  static clock_t
1484 1505  taskq_thread_wait(taskq_t *tq, kmutex_t *mx, kcondvar_t *cv,
1485 1506      callb_cpr_t *cprinfo, clock_t timeout)
1486 1507  {
1487 1508          clock_t ret = 0;
1488 1509  
1489 1510          if (!(tq->tq_flags & TASKQ_CPR_SAFE)) {
1490 1511                  CALLB_CPR_SAFE_BEGIN(cprinfo);
1491 1512          }
1492 1513          if (timeout < 0)
1493 1514                  cv_wait(cv, mx);
1494 1515          else
1495 1516                  ret = cv_reltimedwait(cv, mx, timeout, TR_CLOCK_TICK);
1496 1517  
1497 1518          if (!(tq->tq_flags & TASKQ_CPR_SAFE)) {
1498 1519                  CALLB_CPR_SAFE_END(cprinfo, mx);
1499 1520          }
1500 1521  
1501 1522          return (ret);
1502 1523  }
1503 1524  
1504 1525  /*
1505 1526   * Worker thread for processing task queue.
1506 1527   */
1507 1528  static void
1508 1529  taskq_thread(void *arg)
1509 1530  {
1510 1531          int thread_id;
1511 1532  
1512 1533          taskq_t *tq = arg;
1513 1534          taskq_ent_t *tqe;
1514 1535          callb_cpr_t cprinfo;
1515 1536          hrtime_t start, end;
1516 1537          boolean_t freeit;
1517 1538  
1518 1539          curthread->t_taskq = tq;        /* mark ourselves for taskq_member() */
1519 1540  
1520 1541          if (curproc != &p0 && (tq->tq_flags & TASKQ_DUTY_CYCLE)) {
1521 1542                  sysdc_thread_enter(curthread, tq->tq_DC,
1522 1543                      (tq->tq_flags & TASKQ_DC_BATCH) ? SYSDC_THREAD_BATCH : 0);
1523 1544          }
1524 1545  
1525 1546          if (tq->tq_flags & TASKQ_CPR_SAFE) {
1526 1547                  CALLB_CPR_INIT_SAFE(curthread, tq->tq_name);
1527 1548          } else {
1528 1549                  CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr,
1529 1550                      tq->tq_name);
1530 1551          }
1531 1552          mutex_enter(&tq->tq_lock);
1532 1553          thread_id = ++tq->tq_nthreads;
1533 1554          ASSERT(tq->tq_flags & TASKQ_THREAD_CREATED);
1534 1555          ASSERT(tq->tq_flags & TASKQ_CHANGING);
1535 1556          tq->tq_flags &= ~TASKQ_THREAD_CREATED;
1536 1557  
1537 1558          VERIFY3S(thread_id, <=, tq->tq_nthreads_max);
1538 1559  
1539 1560          if (tq->tq_nthreads_max == 1)
1540 1561                  tq->tq_thread = curthread;
1541 1562          else
1542 1563                  tq->tq_threadlist[thread_id - 1] = curthread;
1543 1564  
1544 1565          /* Allow taskq_create_common()'s taskq_thread_create() to return. */
1545 1566          if (tq->tq_nthreads == TASKQ_CREATE_ACTIVE_THREADS)
1546 1567                  cv_broadcast(&tq->tq_wait_cv);
1547 1568  
1548 1569          for (;;) {
1549 1570                  if (tq->tq_flags & TASKQ_CHANGING) {
1550 1571                          /* See if we're no longer needed */
1551 1572                          if (thread_id > tq->tq_nthreads_target) {
1552 1573                                  /*
1553 1574                                   * To preserve the one-to-one mapping between
1554 1575                                   * thread_id and thread, we must exit from
1555 1576                                   * highest thread ID to least.
1556 1577                                   *
1557 1578                                   * However, if everyone is exiting, the order
1558 1579                                   * doesn't matter, so just exit immediately.
1559 1580                                   * (this is safe, since you must wait for
1560 1581                                   * nthreads to reach 0 after setting
1561 1582                                   * tq_nthreads_target to 0)
1562 1583                                   */
1563 1584                                  if (thread_id == tq->tq_nthreads ||
1564 1585                                      tq->tq_nthreads_target == 0)
1565 1586                                          break;
1566 1587  
1567 1588                                  /* Wait for higher thread_ids to exit */
1568 1589                                  (void) taskq_thread_wait(tq, &tq->tq_lock,
1569 1590                                      &tq->tq_exit_cv, &cprinfo, -1);
1570 1591                                  continue;
1571 1592                          }
1572 1593  
1573 1594                          /*
1574 1595                           * If no thread is starting taskq_thread(), we can
1575 1596                           * do some bookkeeping.
1576 1597                           */
1577 1598                          if (!(tq->tq_flags & TASKQ_THREAD_CREATED)) {
1578 1599                                  /* Check if we've reached our target */
1579 1600                                  if (tq->tq_nthreads == tq->tq_nthreads_target) {
1580 1601                                          tq->tq_flags &= ~TASKQ_CHANGING;
1581 1602                                          cv_broadcast(&tq->tq_wait_cv);
1582 1603                                  }
1583 1604                                  /* Check if we need to create a thread */
1584 1605                                  if (tq->tq_nthreads < tq->tq_nthreads_target) {
1585 1606                                          taskq_thread_create(tq);
1586 1607                                          continue; /* tq_lock was dropped */
1587 1608                                  }
1588 1609                          }
1589 1610                  }
1590 1611                  if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) {
1591 1612                          if (--tq->tq_active == 0)
1592 1613                                  cv_broadcast(&tq->tq_wait_cv);
1593 1614                          (void) taskq_thread_wait(tq, &tq->tq_lock,
1594 1615                              &tq->tq_dispatch_cv, &cprinfo, -1);
1595 1616                          tq->tq_active++;
1596 1617                          continue;
1597 1618                  }
1598 1619  
1599 1620                  tqe->tqent_prev->tqent_next = tqe->tqent_next;
1600 1621                  tqe->tqent_next->tqent_prev = tqe->tqent_prev;
1601 1622                  mutex_exit(&tq->tq_lock);
1602 1623  
1603 1624                  /*
1604 1625                   * For prealloc'd tasks, we don't free anything.  We
1605 1626                   * have to check this now, because once we call the
1606 1627                   * function for a prealloc'd taskq, we can't touch the
1607 1628                   * tqent any longer (calling the function returns the
1608 1629                   * ownershp of the tqent back to caller of
1609 1630                   * taskq_dispatch.)
1610 1631                   */
1611 1632                  if ((!(tq->tq_flags & TASKQ_DYNAMIC)) &&
1612 1633                      (tqe->tqent_un.tqent_flags & TQENT_FLAG_PREALLOC)) {
1613 1634                          /* clear pointers to assist assertion checks */
1614 1635                          tqe->tqent_next = tqe->tqent_prev = NULL;
1615 1636                          freeit = B_FALSE;
1616 1637                  } else {
1617 1638                          freeit = B_TRUE;
1618 1639                  }
1619 1640  
1620 1641                  rw_enter(&tq->tq_threadlock, RW_READER);
1621 1642                  start = gethrtime();
1622 1643                  DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq,
1623 1644                      taskq_ent_t *, tqe);
1624 1645                  tqe->tqent_func(tqe->tqent_arg);
1625 1646                  DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq,
1626 1647                      taskq_ent_t *, tqe);
1627 1648                  end = gethrtime();
1628 1649                  rw_exit(&tq->tq_threadlock);
1629 1650  
1630 1651                  mutex_enter(&tq->tq_lock);
1631 1652                  tq->tq_totaltime += end - start;
1632 1653                  tq->tq_executed++;
1633 1654  
1634 1655                  if (freeit)
1635 1656                          taskq_ent_free(tq, tqe);
1636 1657          }
1637 1658  
1638 1659          if (tq->tq_nthreads_max == 1)
1639 1660                  tq->tq_thread = NULL;
1640 1661          else
1641 1662                  tq->tq_threadlist[thread_id - 1] = NULL;
1642 1663  
1643 1664          /* We're exiting, and therefore no longer active */
1644 1665          ASSERT(tq->tq_active > 0);
1645 1666          tq->tq_active--;
1646 1667  
1647 1668          ASSERT(tq->tq_nthreads > 0);
1648 1669          tq->tq_nthreads--;
1649 1670  
1650 1671          /* Wake up anyone waiting for us to exit */
1651 1672          cv_broadcast(&tq->tq_exit_cv);
1652 1673          if (tq->tq_nthreads == tq->tq_nthreads_target) {
1653 1674                  if (!(tq->tq_flags & TASKQ_THREAD_CREATED))
1654 1675                          tq->tq_flags &= ~TASKQ_CHANGING;
1655 1676  
1656 1677                  cv_broadcast(&tq->tq_wait_cv);
1657 1678          }
1658 1679  
1659 1680          ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE));
1660 1681          CALLB_CPR_EXIT(&cprinfo);               /* drops tq->tq_lock */
1661 1682          if (curthread->t_lwp != NULL) {
1662 1683                  mutex_enter(&curproc->p_lock);
1663 1684                  lwp_exit();
1664 1685          } else {
1665 1686                  thread_exit();
1666 1687          }
1667 1688  }
1668 1689  
1669 1690  /*
1670 1691   * Worker per-entry thread for dynamic dispatches.
1671 1692   */
1672 1693  static void
1673 1694  taskq_d_thread(taskq_ent_t *tqe)
1674 1695  {
1675 1696          taskq_bucket_t  *bucket = tqe->tqent_un.tqent_bucket;
1676 1697          taskq_t         *tq = bucket->tqbucket_taskq;
1677 1698          kmutex_t        *lock = &bucket->tqbucket_lock;
1678 1699          kcondvar_t      *cv = &tqe->tqent_cv;
1679 1700          callb_cpr_t     cprinfo;
1680 1701          clock_t         w;
1681 1702  
1682 1703          CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, tq->tq_name);
1683 1704  
1684 1705          mutex_enter(lock);
1685 1706  
1686 1707          for (;;) {
1687 1708                  /*
1688 1709                   * If a task is scheduled (func != NULL), execute it, otherwise
1689 1710                   * sleep, waiting for a job.
1690 1711                   */
1691 1712                  if (tqe->tqent_func != NULL) {
1692 1713                          hrtime_t        start;
1693 1714                          hrtime_t        end;
1694 1715  
1695 1716                          ASSERT(bucket->tqbucket_nalloc > 0);
1696 1717  
1697 1718                          /*
1698 1719                           * It is possible to free the entry right away before
1699 1720                           * actually executing the task so that subsequent
1700 1721                           * dispatches may immediately reuse it. But this,
1701 1722                           * effectively, creates a two-length queue in the entry
1702 1723                           * and may lead to a deadlock if the execution of the
1703 1724                           * current task depends on the execution of the next
1704 1725                           * scheduled task. So, we keep the entry busy until the
1705 1726                           * task is processed.
1706 1727                           */
1707 1728  
1708 1729                          mutex_exit(lock);
1709 1730                          start = gethrtime();
1710 1731                          DTRACE_PROBE3(taskq__d__exec__start, taskq_t *, tq,
1711 1732                              taskq_bucket_t *, bucket, taskq_ent_t *, tqe);
1712 1733                          tqe->tqent_func(tqe->tqent_arg);
1713 1734                          DTRACE_PROBE3(taskq__d__exec__end, taskq_t *, tq,
1714 1735                              taskq_bucket_t *, bucket, taskq_ent_t *, tqe);
1715 1736                          end = gethrtime();
1716 1737                          mutex_enter(lock);
1717 1738                          bucket->tqbucket_totaltime += end - start;
1718 1739  
1719 1740                          /*
1720 1741                           * Return the entry to the bucket free list.
1721 1742                           */
1722 1743                          tqe->tqent_func = NULL;
1723 1744                          TQ_APPEND(bucket->tqbucket_freelist, tqe);
1724 1745                          bucket->tqbucket_nalloc--;
1725 1746                          bucket->tqbucket_nfree++;
1726 1747                          ASSERT(!IS_EMPTY(bucket->tqbucket_freelist));
1727 1748                          /*
1728 1749                           * taskq_wait() waits for nalloc to drop to zero on
1729 1750                           * tqbucket_cv.
1730 1751                           */
1731 1752                          cv_signal(&bucket->tqbucket_cv);
1732 1753                  }
1733 1754  
1734 1755                  /*
1735 1756                   * At this point the entry must be in the bucket free list -
1736 1757                   * either because it was there initially or because it just
1737 1758                   * finished executing a task and put itself on the free list.
1738 1759                   */
1739 1760                  ASSERT(bucket->tqbucket_nfree > 0);
1740 1761                  /*
1741 1762                   * Go to sleep unless we are closing.
1742 1763                   * If a thread is sleeping too long, it dies.
1743 1764                   */
1744 1765                  if (! (bucket->tqbucket_flags & TQBUCKET_CLOSE)) {
1745 1766                          w = taskq_thread_wait(tq, lock, cv,
1746 1767                              &cprinfo, taskq_thread_timeout * hz);
1747 1768                  }
1748 1769  
1749 1770                  /*
1750 1771                   * At this point we may be in two different states:
1751 1772                   *
1752 1773                   * (1) tqent_func is set which means that a new task is
1753 1774                   *      dispatched and we need to execute it.
1754 1775                   *
1755 1776                   * (2) Thread is sleeping for too long or we are closing. In
1756 1777                   *      both cases destroy the thread and the entry.
1757 1778                   */
1758 1779  
1759 1780                  /* If func is NULL we should be on the freelist. */
1760 1781                  ASSERT((tqe->tqent_func != NULL) ||
1761 1782                      (bucket->tqbucket_nfree > 0));
1762 1783                  /* If func is non-NULL we should be allocated */
1763 1784                  ASSERT((tqe->tqent_func == NULL) ||
1764 1785                      (bucket->tqbucket_nalloc > 0));
1765 1786  
1766 1787                  /* Check freelist consistency */
1767 1788                  ASSERT((bucket->tqbucket_nfree > 0) ||
1768 1789                      IS_EMPTY(bucket->tqbucket_freelist));
1769 1790                  ASSERT((bucket->tqbucket_nfree == 0) ||
1770 1791                      !IS_EMPTY(bucket->tqbucket_freelist));
1771 1792  
1772 1793                  if ((tqe->tqent_func == NULL) &&
1773 1794                      ((w == -1) || (bucket->tqbucket_flags & TQBUCKET_CLOSE))) {
1774 1795                          /*
1775 1796                           * This thread is sleeping for too long or we are
1776 1797                           * closing - time to die.
1777 1798                           * Thread creation/destruction happens rarely,
1778 1799                           * so grabbing the lock is not a big performance issue.
1779 1800                           * The bucket lock is dropped by CALLB_CPR_EXIT().
1780 1801                           */
1781 1802  
1782 1803                          /* Remove the entry from the free list. */
1783 1804                          tqe->tqent_prev->tqent_next = tqe->tqent_next;
1784 1805                          tqe->tqent_next->tqent_prev = tqe->tqent_prev;
1785 1806                          ASSERT(bucket->tqbucket_nfree > 0);
1786 1807                          bucket->tqbucket_nfree--;
1787 1808  
1788 1809                          TQ_STAT(bucket, tqs_tdeaths);
1789 1810                          cv_signal(&bucket->tqbucket_cv);
1790 1811                          tqe->tqent_thread = NULL;
1791 1812                          mutex_enter(&tq->tq_lock);
1792 1813                          tq->tq_tdeaths++;
1793 1814                          mutex_exit(&tq->tq_lock);
1794 1815                          CALLB_CPR_EXIT(&cprinfo);
1795 1816                          kmem_cache_free(taskq_ent_cache, tqe);
1796 1817                          thread_exit();
1797 1818                  }
1798 1819          }
1799 1820  }
1800 1821  
1801 1822  
1802 1823  /*
1803 1824   * Taskq creation. May sleep for memory.
1804 1825   * Always use automatically generated instances to avoid kstat name space
1805 1826   * collisions.
1806 1827   */
1807 1828  
1808 1829  taskq_t *
1809 1830  taskq_create(const char *name, int nthreads, pri_t pri, int minalloc,
1810 1831      int maxalloc, uint_t flags)
1811 1832  {
1812 1833          ASSERT((flags & ~TASKQ_INTERFACE_FLAGS) == 0);
1813 1834  
1814 1835          return (taskq_create_common(name, 0, nthreads, pri, minalloc,
1815 1836              maxalloc, &p0, 0, flags | TASKQ_NOINSTANCE));
1816 1837  }
1817 1838  
1818 1839  /*
1819 1840   * Create an instance of task queue. It is legal to create task queues with the
1820 1841   * same name and different instances.
1821 1842   *
1822 1843   * taskq_create_instance is used by ddi_taskq_create() where it gets the
1823 1844   * instance from ddi_get_instance(). In some cases the instance is not
1824 1845   * initialized and is set to -1. This case is handled as if no instance was
1825 1846   * passed at all.
1826 1847   */
1827 1848  taskq_t *
1828 1849  taskq_create_instance(const char *name, int instance, int nthreads, pri_t pri,
1829 1850      int minalloc, int maxalloc, uint_t flags)
1830 1851  {
1831 1852          ASSERT((flags & ~TASKQ_INTERFACE_FLAGS) == 0);
1832 1853          ASSERT((instance >= 0) || (instance == -1));
1833 1854  
1834 1855          if (instance < 0) {
1835 1856                  flags |= TASKQ_NOINSTANCE;
1836 1857          }
1837 1858  
1838 1859          return (taskq_create_common(name, instance, nthreads,
1839 1860              pri, minalloc, maxalloc, &p0, 0, flags));
1840 1861  }
1841 1862  
1842 1863  taskq_t *
1843 1864  taskq_create_proc(const char *name, int nthreads, pri_t pri, int minalloc,
1844 1865      int maxalloc, proc_t *proc, uint_t flags)
1845 1866  {
1846 1867          ASSERT((flags & ~TASKQ_INTERFACE_FLAGS) == 0);
1847 1868          ASSERT(proc->p_flag & SSYS);
1848 1869  
1849 1870          return (taskq_create_common(name, 0, nthreads, pri, minalloc,
1850 1871              maxalloc, proc, 0, flags | TASKQ_NOINSTANCE));
1851 1872  }
1852 1873  
1853 1874  taskq_t *
1854 1875  taskq_create_sysdc(const char *name, int nthreads, int minalloc,
1855 1876      int maxalloc, proc_t *proc, uint_t dc, uint_t flags)
1856 1877  {
1857 1878          ASSERT((flags & ~TASKQ_INTERFACE_FLAGS) == 0);
1858 1879          ASSERT(proc->p_flag & SSYS);
1859 1880  
1860 1881          return (taskq_create_common(name, 0, nthreads, minclsyspri, minalloc,
1861 1882              maxalloc, proc, dc, flags | TASKQ_NOINSTANCE | TASKQ_DUTY_CYCLE));
1862 1883  }
1863 1884  
1864 1885  static taskq_t *
1865 1886  taskq_create_common(const char *name, int instance, int nthreads, pri_t pri,
1866 1887      int minalloc, int maxalloc, proc_t *proc, uint_t dc, uint_t flags)
1867 1888  {
1868 1889          taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_SLEEP);
1869 1890          uint_t ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus);
1870 1891          uint_t bsize;   /* # of buckets - always power of 2 */
1871 1892          int max_nthreads;
1872 1893  
1873 1894          /*
1874 1895           * TASKQ_DYNAMIC, TASKQ_CPR_SAFE and TASKQ_THREADS_CPU_PCT are all
1875 1896           * mutually incompatible.
1876 1897           */
1877 1898          IMPLY((flags & TASKQ_DYNAMIC), !(flags & TASKQ_CPR_SAFE));
1878 1899          IMPLY((flags & TASKQ_DYNAMIC), !(flags & TASKQ_THREADS_CPU_PCT));
1879 1900          IMPLY((flags & TASKQ_CPR_SAFE), !(flags & TASKQ_THREADS_CPU_PCT));
1880 1901  
1881 1902          /* Cannot have DYNAMIC with DUTY_CYCLE */
1882 1903          IMPLY((flags & TASKQ_DYNAMIC), !(flags & TASKQ_DUTY_CYCLE));
1883 1904  
1884 1905          /* Cannot have DUTY_CYCLE with a p0 kernel process */
1885 1906          IMPLY((flags & TASKQ_DUTY_CYCLE), proc != &p0);
1886 1907  
1887 1908          /* Cannot have DC_BATCH without DUTY_CYCLE */
1888 1909          ASSERT((flags & (TASKQ_DUTY_CYCLE|TASKQ_DC_BATCH)) != TASKQ_DC_BATCH);
1889 1910  
1890 1911          ASSERT(proc != NULL);
1891 1912  
1892 1913          bsize = 1 << (highbit(ncpus) - 1);
1893 1914          ASSERT(bsize >= 1);
1894 1915          bsize = MIN(bsize, taskq_maxbuckets);
1895 1916  
1896 1917          if (flags & TASKQ_DYNAMIC) {
1897 1918                  ASSERT3S(nthreads, >=, 1);
1898 1919                  tq->tq_maxsize = nthreads;
1899 1920  
1900 1921                  /* For dynamic task queues use just one backup thread */
1901 1922                  nthreads = max_nthreads = 1;
1902 1923  
1903 1924          } else if (flags & TASKQ_THREADS_CPU_PCT) {
1904 1925                  uint_t pct;
1905 1926                  ASSERT3S(nthreads, >=, 0);
1906 1927                  pct = nthreads;
1907 1928  
1908 1929                  if (pct > taskq_cpupct_max_percent)
1909 1930                          pct = taskq_cpupct_max_percent;
1910 1931  
1911 1932                  /*
1912 1933                   * If you're using THREADS_CPU_PCT, the process for the
1913 1934                   * taskq threads must be curproc.  This allows any pset
1914 1935                   * binding to be inherited correctly.  If proc is &p0,
1915 1936                   * we won't be creating LWPs, so new threads will be assigned
1916 1937                   * to the default processor set.
1917 1938                   */
1918 1939                  ASSERT(curproc == proc || proc == &p0);
1919 1940                  tq->tq_threads_ncpus_pct = pct;
1920 1941                  nthreads = 1;           /* corrected in taskq_thread_create() */
1921 1942                  max_nthreads = TASKQ_THREADS_PCT(max_ncpus, pct);
1922 1943  
1923 1944          } else {
1924 1945                  ASSERT3S(nthreads, >=, 1);
1925 1946                  max_nthreads = nthreads;
1926 1947          }
1927 1948  
1928 1949          if (max_nthreads < taskq_minimum_nthreads_max)
1929 1950                  max_nthreads = taskq_minimum_nthreads_max;
1930 1951  
1931 1952          /*
1932 1953           * Make sure the name is 0-terminated, and conforms to the rules for
1933 1954           * C indentifiers
1934 1955           */
1935 1956          (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1);
1936 1957          strident_canon(tq->tq_name, TASKQ_NAMELEN + 1);
1937 1958  
1938 1959          tq->tq_flags = flags | TASKQ_CHANGING;
1939 1960          tq->tq_active = 0;
1940 1961          tq->tq_instance = instance;
1941 1962          tq->tq_nthreads_target = nthreads;
1942 1963          tq->tq_nthreads_max = max_nthreads;
1943 1964          tq->tq_minalloc = minalloc;
1944 1965          tq->tq_maxalloc = maxalloc;
1945 1966          tq->tq_nbuckets = bsize;
1946 1967          tq->tq_proc = proc;
1947 1968          tq->tq_pri = pri;
1948 1969          tq->tq_DC = dc;
1949 1970          list_link_init(&tq->tq_cpupct_link);
1950 1971  
1951 1972          if (max_nthreads > 1)
1952 1973                  tq->tq_threadlist = kmem_alloc(
1953 1974                      sizeof (kthread_t *) * max_nthreads, KM_SLEEP);
1954 1975  
1955 1976          mutex_enter(&tq->tq_lock);
1956 1977          if (flags & TASKQ_PREPOPULATE) {
1957 1978                  while (minalloc-- > 0)
1958 1979                          taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP));
1959 1980          }
1960 1981  
1961 1982          /*
1962 1983           * Before we start creating threads for this taskq, take a
1963 1984           * zone hold so the zone can't go away before taskq_destroy
1964 1985           * makes sure all the taskq threads are gone.  This hold is
1965 1986           * similar in purpose to those taken by zthread_create().
1966 1987           */
1967 1988          zone_hold(tq->tq_proc->p_zone);
1968 1989  
1969 1990          /*
1970 1991           * Create the first thread, which will create any other threads
1971 1992           * necessary.  taskq_thread_create will not return until we have
1972 1993           * enough threads to be able to process requests.
1973 1994           */
1974 1995          taskq_thread_create(tq);
1975 1996          mutex_exit(&tq->tq_lock);
1976 1997  
1977 1998          if (flags & TASKQ_DYNAMIC) {
1978 1999                  taskq_bucket_t *bucket = kmem_zalloc(sizeof (taskq_bucket_t) *
1979 2000                      bsize, KM_SLEEP);
1980 2001                  int b_id;
1981 2002  
1982 2003                  tq->tq_buckets = bucket;
1983 2004  
1984 2005                  /* Initialize each bucket */
1985 2006                  for (b_id = 0; b_id < bsize; b_id++, bucket++) {
1986 2007                          mutex_init(&bucket->tqbucket_lock, NULL, MUTEX_DEFAULT,
1987 2008                              NULL);
1988 2009                          cv_init(&bucket->tqbucket_cv, NULL, CV_DEFAULT, NULL);
1989 2010                          bucket->tqbucket_taskq = tq;
1990 2011                          bucket->tqbucket_freelist.tqent_next =
1991 2012                              bucket->tqbucket_freelist.tqent_prev =
1992 2013                              &bucket->tqbucket_freelist;
1993 2014                          if (flags & TASKQ_PREPOPULATE)
1994 2015                                  taskq_bucket_extend(bucket);
1995 2016                  }
1996 2017          }
1997 2018  
1998 2019          /*
1999 2020           * Install kstats.
2000 2021           * We have two cases:
2001 2022           *   1) Instance is provided to taskq_create_instance(). In this case it
2002 2023           *      should be >= 0 and we use it.
2003 2024           *
2004 2025           *   2) Instance is not provided and is automatically generated
2005 2026           */
2006 2027          if (flags & TASKQ_NOINSTANCE) {
2007 2028                  instance = tq->tq_instance =
2008 2029                      (int)(uintptr_t)vmem_alloc(taskq_id_arena, 1, VM_SLEEP);
2009 2030          }
2010 2031  
2011 2032          if (flags & TASKQ_DYNAMIC) {
2012 2033                  if ((tq->tq_kstat = kstat_create("unix", instance,
2013 2034                      tq->tq_name, "taskq_d", KSTAT_TYPE_NAMED,
2014 2035                      sizeof (taskq_d_kstat) / sizeof (kstat_named_t),
2015 2036                      KSTAT_FLAG_VIRTUAL)) != NULL) {
2016 2037                          tq->tq_kstat->ks_lock = &taskq_d_kstat_lock;
2017 2038                          tq->tq_kstat->ks_data = &taskq_d_kstat;
2018 2039                          tq->tq_kstat->ks_update = taskq_d_kstat_update;
2019 2040                          tq->tq_kstat->ks_private = tq;
2020 2041                          kstat_install(tq->tq_kstat);
2021 2042                  }
2022 2043          } else {
2023 2044                  if ((tq->tq_kstat = kstat_create("unix", instance, tq->tq_name,
2024 2045                      "taskq", KSTAT_TYPE_NAMED,
2025 2046                      sizeof (taskq_kstat) / sizeof (kstat_named_t),
2026 2047                      KSTAT_FLAG_VIRTUAL)) != NULL) {
2027 2048                          tq->tq_kstat->ks_lock = &taskq_kstat_lock;
2028 2049                          tq->tq_kstat->ks_data = &taskq_kstat;
2029 2050                          tq->tq_kstat->ks_update = taskq_kstat_update;
2030 2051                          tq->tq_kstat->ks_private = tq;
2031 2052                          kstat_install(tq->tq_kstat);
2032 2053                  }
2033 2054          }
2034 2055  
2035 2056          return (tq);
2036 2057  }
2037 2058  
2038 2059  /*
2039 2060   * taskq_destroy().
2040 2061   *
2041 2062   * Assumes: by the time taskq_destroy is called no one will use this task queue
2042 2063   * in any way and no one will try to dispatch entries in it.
2043 2064   */
2044 2065  void
2045 2066  taskq_destroy(taskq_t *tq)
2046 2067  {
2047 2068          taskq_bucket_t *b = tq->tq_buckets;
2048 2069          int bid = 0;
2049 2070  
2050 2071          ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE));
2051 2072  
2052 2073          /*
2053 2074           * Destroy kstats.
2054 2075           */
2055 2076          if (tq->tq_kstat != NULL) {
2056 2077                  kstat_delete(tq->tq_kstat);
2057 2078                  tq->tq_kstat = NULL;
2058 2079          }
2059 2080  
2060 2081          /*
2061 2082           * Destroy instance if needed.
2062 2083           */
2063 2084          if (tq->tq_flags & TASKQ_NOINSTANCE) {
2064 2085                  vmem_free(taskq_id_arena, (void *)(uintptr_t)(tq->tq_instance),
2065 2086                      1);
2066 2087                  tq->tq_instance = 0;
2067 2088          }
2068 2089  
2069 2090          /*
2070 2091           * Unregister from the cpupct list.
2071 2092           */
2072 2093          if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) {
2073 2094                  taskq_cpupct_remove(tq);
2074 2095          }
2075 2096  
2076 2097          /*
2077 2098           * Wait for any pending entries to complete.
2078 2099           */
2079 2100          taskq_wait(tq);
2080 2101  
2081 2102          mutex_enter(&tq->tq_lock);
2082 2103          ASSERT((tq->tq_task.tqent_next == &tq->tq_task) &&
2083 2104              (tq->tq_active == 0));
2084 2105  
2085 2106          /* notify all the threads that they need to exit */
2086 2107          tq->tq_nthreads_target = 0;
2087 2108  
2088 2109          tq->tq_flags |= TASKQ_CHANGING;
2089 2110          cv_broadcast(&tq->tq_dispatch_cv);
2090 2111          cv_broadcast(&tq->tq_exit_cv);
2091 2112  
2092 2113          while (tq->tq_nthreads != 0)
2093 2114                  cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
2094 2115  
2095 2116          if (tq->tq_nthreads_max != 1)
2096 2117                  kmem_free(tq->tq_threadlist, sizeof (kthread_t *) *
2097 2118                      tq->tq_nthreads_max);
2098 2119  
2099 2120          tq->tq_minalloc = 0;
2100 2121          while (tq->tq_nalloc != 0)
2101 2122                  taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP));
2102 2123  
2103 2124          mutex_exit(&tq->tq_lock);
2104 2125  
2105 2126          /*
2106 2127           * Mark each bucket as closing and wakeup all sleeping threads.
2107 2128           */
2108 2129          for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
2109 2130                  taskq_ent_t *tqe;
2110 2131  
2111 2132                  mutex_enter(&b->tqbucket_lock);
2112 2133  
2113 2134                  b->tqbucket_flags |= TQBUCKET_CLOSE;
2114 2135                  /* Wakeup all sleeping threads */
2115 2136  
2116 2137                  for (tqe = b->tqbucket_freelist.tqent_next;
2117 2138                      tqe != &b->tqbucket_freelist; tqe = tqe->tqent_next)
2118 2139                          cv_signal(&tqe->tqent_cv);
2119 2140  
2120 2141                  ASSERT(b->tqbucket_nalloc == 0);
2121 2142  
2122 2143                  /*
2123 2144                   * At this point we waited for all pending jobs to complete (in
2124 2145                   * both the task queue and the bucket and no new jobs should
2125 2146                   * arrive. Wait for all threads to die.
2126 2147                   */
2127 2148                  while (b->tqbucket_nfree > 0)
2128 2149                          cv_wait(&b->tqbucket_cv, &b->tqbucket_lock);
2129 2150                  mutex_exit(&b->tqbucket_lock);
2130 2151                  mutex_destroy(&b->tqbucket_lock);
2131 2152                  cv_destroy(&b->tqbucket_cv);
2132 2153          }
2133 2154  
2134 2155          if (tq->tq_buckets != NULL) {
2135 2156                  ASSERT(tq->tq_flags & TASKQ_DYNAMIC);
2136 2157                  kmem_free(tq->tq_buckets,
2137 2158                      sizeof (taskq_bucket_t) * tq->tq_nbuckets);
2138 2159  
2139 2160                  /* Cleanup fields before returning tq to the cache */
2140 2161                  tq->tq_buckets = NULL;
2141 2162                  tq->tq_tcreates = 0;
2142 2163                  tq->tq_tdeaths = 0;
2143 2164          } else {
2144 2165                  ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
2145 2166          }
2146 2167  
2147 2168          /*
2148 2169           * Now that all the taskq threads are gone, we can
2149 2170           * drop the zone hold taken in taskq_create_common
2150 2171           */
2151 2172          zone_rele(tq->tq_proc->p_zone);
2152 2173  
2153 2174          tq->tq_threads_ncpus_pct = 0;
2154 2175          tq->tq_totaltime = 0;
2155 2176          tq->tq_tasks = 0;
2156 2177          tq->tq_maxtasks = 0;
2157 2178          tq->tq_executed = 0;
2158 2179          kmem_cache_free(taskq_cache, tq);
2159 2180  }
2160 2181  
2161 2182  /*
2162 2183   * Extend a bucket with a new entry on the free list and attach a worker thread
2163 2184   * to it.
2164 2185   *
2165 2186   * Argument: pointer to the bucket.
2166 2187   *
2167 2188   * This function may quietly fail. It is only used by taskq_dispatch() which
2168 2189   * handles such failures properly.
2169 2190   */
2170 2191  static void
2171 2192  taskq_bucket_extend(void *arg)
2172 2193  {
2173 2194          taskq_ent_t *tqe;
2174 2195          taskq_bucket_t *b = (taskq_bucket_t *)arg;
2175 2196          taskq_t *tq = b->tqbucket_taskq;
2176 2197          int nthreads;
2177 2198  
2178 2199          mutex_enter(&tq->tq_lock);
2179 2200  
2180 2201          if (! ENOUGH_MEMORY()) {
2181 2202                  tq->tq_nomem++;
2182 2203                  mutex_exit(&tq->tq_lock);
2183 2204                  return;
2184 2205          }
2185 2206  
2186 2207          /*
2187 2208           * Observe global taskq limits on the number of threads.
2188 2209           */
2189 2210          if (tq->tq_tcreates++ - tq->tq_tdeaths > tq->tq_maxsize) {
2190 2211                  tq->tq_tcreates--;
2191 2212                  mutex_exit(&tq->tq_lock);
2192 2213                  return;
2193 2214          }
2194 2215          mutex_exit(&tq->tq_lock);
2195 2216  
2196 2217          tqe = kmem_cache_alloc(taskq_ent_cache, KM_NOSLEEP);
2197 2218  
2198 2219          if (tqe == NULL) {
2199 2220                  mutex_enter(&tq->tq_lock);
2200 2221                  tq->tq_nomem++;
2201 2222                  tq->tq_tcreates--;
2202 2223                  mutex_exit(&tq->tq_lock);
2203 2224                  return;
2204 2225          }
2205 2226  
2206 2227          ASSERT(tqe->tqent_thread == NULL);
2207 2228  
2208 2229          tqe->tqent_un.tqent_bucket = b;
2209 2230  
2210 2231          /*
2211 2232           * Create a thread in a TS_STOPPED state first. If it is successfully
2212 2233           * created, place the entry on the free list and start the thread.
2213 2234           */
2214 2235          tqe->tqent_thread = thread_create(NULL, 0, taskq_d_thread, tqe,
2215 2236              0, tq->tq_proc, TS_STOPPED, tq->tq_pri);
2216 2237  
2217 2238          /*
2218 2239           * Once the entry is ready, link it to the the bucket free list.
2219 2240           */
2220 2241          mutex_enter(&b->tqbucket_lock);
2221 2242          tqe->tqent_func = NULL;
2222 2243          TQ_APPEND(b->tqbucket_freelist, tqe);
2223 2244          b->tqbucket_nfree++;
2224 2245          TQ_STAT(b, tqs_tcreates);
2225 2246  
2226 2247  #if TASKQ_STATISTIC
2227 2248          nthreads = b->tqbucket_stat.tqs_tcreates -
2228 2249              b->tqbucket_stat.tqs_tdeaths;
2229 2250          b->tqbucket_stat.tqs_maxthreads = MAX(nthreads,
2230 2251              b->tqbucket_stat.tqs_maxthreads);
2231 2252  #endif
2232 2253  
2233 2254          mutex_exit(&b->tqbucket_lock);
2234 2255          /*
2235 2256           * Start the stopped thread.
2236 2257           */
2237 2258          thread_lock(tqe->tqent_thread);
2238 2259          tqe->tqent_thread->t_taskq = tq;
2239 2260          tqe->tqent_thread->t_schedflag |= TS_ALLSTART;
2240 2261          setrun_locked(tqe->tqent_thread);
2241 2262          thread_unlock(tqe->tqent_thread);
2242 2263  }
2243 2264  
2244 2265  static int
2245 2266  taskq_kstat_update(kstat_t *ksp, int rw)
2246 2267  {
2247 2268          struct taskq_kstat *tqsp = &taskq_kstat;
2248 2269          taskq_t *tq = ksp->ks_private;
2249 2270  
2250 2271          if (rw == KSTAT_WRITE)
2251 2272                  return (EACCES);
2252 2273  
2253 2274          tqsp->tq_pid.value.ui64 = tq->tq_proc->p_pid;
2254 2275          tqsp->tq_tasks.value.ui64 = tq->tq_tasks;
2255 2276          tqsp->tq_executed.value.ui64 = tq->tq_executed;
2256 2277          tqsp->tq_maxtasks.value.ui64 = tq->tq_maxtasks;
2257 2278          tqsp->tq_totaltime.value.ui64 = tq->tq_totaltime;
2258 2279          tqsp->tq_nactive.value.ui64 = tq->tq_active;
2259 2280          tqsp->tq_nalloc.value.ui64 = tq->tq_nalloc;
2260 2281          tqsp->tq_pri.value.ui64 = tq->tq_pri;
2261 2282          tqsp->tq_nthreads.value.ui64 = tq->tq_nthreads;
2262 2283          tqsp->tq_nomem.value.ui64 = tq->tq_nomem;
2263 2284          return (0);
2264 2285  }
2265 2286  
2266 2287  static int
2267 2288  taskq_d_kstat_update(kstat_t *ksp, int rw)
2268 2289  {
2269 2290          struct taskq_d_kstat *tqsp = &taskq_d_kstat;
2270 2291          taskq_t *tq = ksp->ks_private;
2271 2292          taskq_bucket_t *b = tq->tq_buckets;
2272 2293          int bid = 0;
2273 2294  
2274 2295          if (rw == KSTAT_WRITE)
2275 2296                  return (EACCES);
2276 2297  
2277 2298          ASSERT(tq->tq_flags & TASKQ_DYNAMIC);
2278 2299  
2279 2300          tqsp->tqd_btasks.value.ui64 = tq->tq_tasks;
2280 2301          tqsp->tqd_bexecuted.value.ui64 = tq->tq_executed;
2281 2302          tqsp->tqd_bmaxtasks.value.ui64 = tq->tq_maxtasks;
2282 2303          tqsp->tqd_bnalloc.value.ui64 = tq->tq_nalloc;
2283 2304          tqsp->tqd_bnactive.value.ui64 = tq->tq_active;
2284 2305          tqsp->tqd_btotaltime.value.ui64 = tq->tq_totaltime;
2285 2306          tqsp->tqd_pri.value.ui64 = tq->tq_pri;
2286 2307          tqsp->tqd_nomem.value.ui64 = tq->tq_nomem;
2287 2308  
2288 2309          tqsp->tqd_hits.value.ui64 = 0;
2289 2310          tqsp->tqd_misses.value.ui64 = 0;
2290 2311          tqsp->tqd_overflows.value.ui64 = 0;
2291 2312          tqsp->tqd_tcreates.value.ui64 = 0;
2292 2313          tqsp->tqd_tdeaths.value.ui64 = 0;
2293 2314          tqsp->tqd_maxthreads.value.ui64 = 0;
2294 2315          tqsp->tqd_nomem.value.ui64 = 0;
2295 2316          tqsp->tqd_disptcreates.value.ui64 = 0;
2296 2317          tqsp->tqd_totaltime.value.ui64 = 0;
2297 2318          tqsp->tqd_nalloc.value.ui64 = 0;
2298 2319          tqsp->tqd_nfree.value.ui64 = 0;
2299 2320  
2300 2321          for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
2301 2322                  tqsp->tqd_hits.value.ui64 += b->tqbucket_stat.tqs_hits;
2302 2323                  tqsp->tqd_misses.value.ui64 += b->tqbucket_stat.tqs_misses;
2303 2324                  tqsp->tqd_overflows.value.ui64 += b->tqbucket_stat.tqs_overflow;
2304 2325                  tqsp->tqd_tcreates.value.ui64 += b->tqbucket_stat.tqs_tcreates;
2305 2326                  tqsp->tqd_tdeaths.value.ui64 += b->tqbucket_stat.tqs_tdeaths;
2306 2327                  tqsp->tqd_maxthreads.value.ui64 +=
2307 2328                      b->tqbucket_stat.tqs_maxthreads;
2308 2329                  tqsp->tqd_disptcreates.value.ui64 +=
2309 2330                      b->tqbucket_stat.tqs_disptcreates;
2310 2331                  tqsp->tqd_totaltime.value.ui64 += b->tqbucket_totaltime;
2311 2332                  tqsp->tqd_nalloc.value.ui64 += b->tqbucket_nalloc;
2312 2333                  tqsp->tqd_nfree.value.ui64 += b->tqbucket_nfree;
2313 2334          }
2314 2335          return (0);
2315 2336  }
  
    | 
      ↓ open down ↓ | 
    984 lines elided | 
    
      ↑ open up ↑ | 
  
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX