Print this page
    
9017 Introduce taskq_empty()
Reviewed by: Bryan Cantrill <bryan@joyent.com>
Reviewed by: Dan McDonald <danmcd@joyent.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Yuri Pankov <yuripv@yuripv.net>
    
      
        | Split | 
	Close | 
      
      | Expand all | 
      | Collapse all | 
    
    
          --- old/usr/src/lib/libfakekernel/common/taskq.c
          +++ new/usr/src/lib/libfakekernel/common/taskq.c
   1    1  /*
   2    2   * CDDL HEADER START
   3    3   *
   4    4   * The contents of this file are subject to the terms of the
   5    5   * Common Development and Distribution License (the "License").
   6    6   * You may not use this file except in compliance with the License.
   7    7   *
   8    8   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9    9   * or http://www.opensolaris.org/os/licensing.
  10   10   * See the License for the specific language governing permissions
  11   11   * and limitations under the License.
  12   12   *
  13   13   * When distributing Covered Code, include this CDDL HEADER in each
  14   14   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15   15   * If applicable, add the following below this CDDL HEADER, with the
  16   16   * fields enclosed by brackets "[]" replaced with your own identifying
  17   17   * information: Portions Copyright [yyyy] [name of copyright owner]
  18   18   *
  
    | 
      ↓ open down ↓ | 
    18 lines elided | 
    
      ↑ open up ↑ | 
  
  19   19   * CDDL HEADER END
  20   20   */
  21   21  /*
  22   22   * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23   23   * Use is subject to license terms.
  24   24   */
  25   25  /*
  26   26   * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
  27   27   * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  28   28   * Copyright 2017 RackTop Systems.
       29 + * Copyright 2018, Joyent, Inc.
  29   30   */
  30   31  
  31   32  #include <sys/taskq_impl.h>
  32   33  
  33   34  #include <sys/class.h>
  34   35  #include <sys/debug.h>
  35   36  #include <sys/ksynch.h>
  36   37  #include <sys/kmem.h>
  37   38  #include <sys/time.h>
  38   39  #include <sys/systm.h>
  39   40  #include <sys/sysmacros.h>
  40   41  #include <sys/unistd.h>
  41   42  
  42   43  /* avoid <sys/disp.h> */
  43   44  #define maxclsyspri     99
  44   45  
  45   46  /* avoid <unistd.h> */
  46   47  extern long sysconf(int);
  47   48  
  48   49  /* avoiding <thread.h> */
  49   50  typedef unsigned int thread_t;
  50   51  typedef unsigned int thread_key_t;
  51   52  
  52   53  extern int thr_create(void *, size_t, void *(*)(void *), void *, long,
  53   54                          thread_t *);
  54   55  extern int thr_join(thread_t, thread_t *, void **);
  55   56  
  56   57  /*
  57   58   * POSIX.1c Note:
  58   59   * THR_BOUND is defined same as PTHREAD_SCOPE_SYSTEM in <pthread.h>
  59   60   * THR_DETACHED is defined same as PTHREAD_CREATE_DETACHED in <pthread.h>
  60   61   * Any changes in these definitions should be reflected in <pthread.h>
  61   62   */
  62   63  #define THR_BOUND               0x00000001      /* = PTHREAD_SCOPE_SYSTEM */
  63   64  #define THR_NEW_LWP             0x00000002
  64   65  #define THR_DETACHED            0x00000040      /* = PTHREAD_CREATE_DETACHED */
  65   66  #define THR_SUSPENDED           0x00000080
  66   67  #define THR_DAEMON              0x00000100
  67   68  
  68   69  
  69   70  int taskq_now;
  70   71  taskq_t *system_taskq;
  71   72  
  72   73  #define TASKQ_ACTIVE    0x00010000
  73   74  
  74   75  struct taskq {
  75   76          kmutex_t        tq_lock;
  76   77          krwlock_t       tq_threadlock;
  77   78          kcondvar_t      tq_dispatch_cv;
  78   79          kcondvar_t      tq_wait_cv;
  79   80          thread_t        *tq_threadlist;
  80   81          int             tq_flags;
  81   82          int             tq_active;
  82   83          int             tq_nthreads;
  83   84          int             tq_nalloc;
  84   85          int             tq_minalloc;
  85   86          int             tq_maxalloc;
  86   87          kcondvar_t      tq_maxalloc_cv;
  87   88          int             tq_maxalloc_wait;
  88   89          taskq_ent_t     *tq_freelist;
  89   90          taskq_ent_t     tq_task;
  90   91  };
  91   92  
  92   93  static taskq_ent_t *
  93   94  task_alloc(taskq_t *tq, int tqflags)
  94   95  {
  95   96          taskq_ent_t *t;
  96   97          int rv;
  97   98  
  98   99  again:  if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
  99  100                  tq->tq_freelist = t->tqent_next;
 100  101          } else {
 101  102                  if (tq->tq_nalloc >= tq->tq_maxalloc) {
 102  103                          if (!(tqflags & KM_SLEEP))
 103  104                                  return (NULL);
 104  105  
 105  106                          /*
 106  107                           * We don't want to exceed tq_maxalloc, but we can't
 107  108                           * wait for other tasks to complete (and thus free up
 108  109                           * task structures) without risking deadlock with
 109  110                           * the caller.  So, we just delay for one second
 110  111                           * to throttle the allocation rate. If we have tasks
 111  112                           * complete before one second timeout expires then
 112  113                           * taskq_ent_free will signal us and we will
 113  114                           * immediately retry the allocation.
 114  115                           */
 115  116                          tq->tq_maxalloc_wait++;
 116  117                          rv = cv_timedwait(&tq->tq_maxalloc_cv,
 117  118                              &tq->tq_lock, ddi_get_lbolt() + hz);
 118  119                          tq->tq_maxalloc_wait--;
 119  120                          if (rv > 0)
 120  121                                  goto again;             /* signaled */
 121  122                  }
 122  123                  mutex_exit(&tq->tq_lock);
 123  124  
 124  125                  t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
 125  126  
 126  127                  mutex_enter(&tq->tq_lock);
 127  128                  if (t != NULL)
 128  129                          tq->tq_nalloc++;
 129  130          }
 130  131          return (t);
 131  132  }
 132  133  
 133  134  static void
 134  135  task_free(taskq_t *tq, taskq_ent_t *t)
 135  136  {
 136  137          if (tq->tq_nalloc <= tq->tq_minalloc) {
 137  138                  t->tqent_next = tq->tq_freelist;
 138  139                  tq->tq_freelist = t;
 139  140          } else {
 140  141                  tq->tq_nalloc--;
 141  142                  mutex_exit(&tq->tq_lock);
 142  143                  kmem_free(t, sizeof (taskq_ent_t));
 143  144                  mutex_enter(&tq->tq_lock);
 144  145          }
 145  146  
 146  147          if (tq->tq_maxalloc_wait)
 147  148                  cv_signal(&tq->tq_maxalloc_cv);
 148  149  }
 149  150  
 150  151  taskqid_t
 151  152  taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
 152  153  {
 153  154          taskq_ent_t *t;
 154  155  
 155  156          if (taskq_now) {
 156  157                  func(arg);
 157  158                  return (1);
 158  159          }
 159  160  
 160  161          mutex_enter(&tq->tq_lock);
 161  162          ASSERT(tq->tq_flags & TASKQ_ACTIVE);
 162  163          if ((t = task_alloc(tq, tqflags)) == NULL) {
 163  164                  mutex_exit(&tq->tq_lock);
 164  165                  return (0);
 165  166          }
 166  167          if (tqflags & TQ_FRONT) {
 167  168                  t->tqent_next = tq->tq_task.tqent_next;
 168  169                  t->tqent_prev = &tq->tq_task;
 169  170          } else {
 170  171                  t->tqent_next = &tq->tq_task;
 171  172                  t->tqent_prev = tq->tq_task.tqent_prev;
 172  173          }
 173  174          t->tqent_next->tqent_prev = t;
 174  175          t->tqent_prev->tqent_next = t;
 175  176          t->tqent_func = func;
 176  177          t->tqent_arg = arg;
 177  178          t->tqent_flags = 0;
 178  179          cv_signal(&tq->tq_dispatch_cv);
 179  180          mutex_exit(&tq->tq_lock);
 180  181          return (1);
 181  182  }
 182  183  
 183  184  void
 184  185  taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
 185  186      taskq_ent_t *t)
 186  187  {
 187  188          ASSERT(func != NULL);
 188  189          ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
 189  190  
 190  191          /*
 191  192           * Mark it as a prealloc'd task.  This is important
 192  193           * to ensure that we don't free it later.
 193  194           */
 194  195          t->tqent_flags |= TQENT_FLAG_PREALLOC;
 195  196          /*
 196  197           * Enqueue the task to the underlying queue.
 197  198           */
 198  199          mutex_enter(&tq->tq_lock);
 199  200  
 200  201          if (flags & TQ_FRONT) {
 201  202                  t->tqent_next = tq->tq_task.tqent_next;
 202  203                  t->tqent_prev = &tq->tq_task;
 203  204          } else {
 204  205                  t->tqent_next = &tq->tq_task;
  
    | 
      ↓ open down ↓ | 
    166 lines elided | 
    
      ↑ open up ↑ | 
  
 205  206                  t->tqent_prev = tq->tq_task.tqent_prev;
 206  207          }
 207  208          t->tqent_next->tqent_prev = t;
 208  209          t->tqent_prev->tqent_next = t;
 209  210          t->tqent_func = func;
 210  211          t->tqent_arg = arg;
 211  212          cv_signal(&tq->tq_dispatch_cv);
 212  213          mutex_exit(&tq->tq_lock);
 213  214  }
 214  215  
      216 +boolean_t
      217 +taskq_empty(taskq_t *tq)
      218 +{
      219 +        boolean_t rv;
      220 +
      221 +        mutex_enter(&tq->tq_lock);
      222 +        rv = (tq->tq_task.tqent_next == &tq->tq_task) && (tq->tq_active == 0);
      223 +        mutex_exit(&tq->tq_lock);
      224 +
      225 +        return (rv);
      226 +}
      227 +
 215  228  void
 216  229  taskq_wait(taskq_t *tq)
 217  230  {
 218  231          mutex_enter(&tq->tq_lock);
 219  232          while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
 220  233                  cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
 221  234          mutex_exit(&tq->tq_lock);
 222  235  }
 223  236  
 224  237  static void *
 225  238  taskq_thread(void *arg)
 226  239  {
 227  240          taskq_t *tq = arg;
 228  241          taskq_ent_t *t;
 229  242          boolean_t prealloc;
 230  243  
 231  244          mutex_enter(&tq->tq_lock);
 232  245          while (tq->tq_flags & TASKQ_ACTIVE) {
 233  246                  if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
 234  247                          if (--tq->tq_active == 0)
 235  248                                  cv_broadcast(&tq->tq_wait_cv);
 236  249                          cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
 237  250                          tq->tq_active++;
 238  251                          continue;
 239  252                  }
 240  253                  t->tqent_prev->tqent_next = t->tqent_next;
 241  254                  t->tqent_next->tqent_prev = t->tqent_prev;
 242  255                  t->tqent_next = NULL;
 243  256                  t->tqent_prev = NULL;
 244  257                  prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
 245  258                  mutex_exit(&tq->tq_lock);
 246  259  
 247  260                  rw_enter(&tq->tq_threadlock, RW_READER);
 248  261                  t->tqent_func(t->tqent_arg);
 249  262                  rw_exit(&tq->tq_threadlock);
 250  263  
 251  264                  mutex_enter(&tq->tq_lock);
 252  265                  if (!prealloc)
 253  266                          task_free(tq, t);
 254  267          }
 255  268          tq->tq_nthreads--;
 256  269          cv_broadcast(&tq->tq_wait_cv);
 257  270          mutex_exit(&tq->tq_lock);
 258  271          return (NULL);
 259  272  }
 260  273  
 261  274  /*ARGSUSED*/
 262  275  taskq_t *
 263  276  taskq_create(const char *name, int nthr, pri_t pri, int minalloc,
 264  277      int maxalloc, uint_t flags)
 265  278  {
 266  279          return (taskq_create_proc(name, nthr, pri,
 267  280              minalloc, maxalloc, NULL, flags));
 268  281  }
 269  282  
 270  283  /*ARGSUSED*/
 271  284  taskq_t *
 272  285  taskq_create_sysdc(const char *name, int nthr, int minalloc,
 273  286      int maxalloc, proc_t *proc, uint_t dc, uint_t flags)
 274  287  {
 275  288          return (taskq_create_proc(name, nthr, maxclsyspri,
 276  289              minalloc, maxalloc, proc, flags));
 277  290  }
 278  291  
 279  292  /*ARGSUSED*/
 280  293  taskq_t *
 281  294  taskq_create_proc(const char *name, int nthreads, pri_t pri,
 282  295      int minalloc, int maxalloc, proc_t *proc, uint_t flags)
 283  296  {
 284  297          taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
 285  298          int t;
 286  299  
 287  300          if (flags & TASKQ_THREADS_CPU_PCT) {
 288  301                  int pct;
 289  302                  ASSERT3S(nthreads, >=, 0);
 290  303                  ASSERT3S(nthreads, <=, 100);
 291  304                  pct = MIN(nthreads, 100);
 292  305                  pct = MAX(pct, 0);
 293  306  
 294  307                  nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
 295  308                  nthreads = MAX(nthreads, 1);    /* need at least 1 thread */
 296  309          } else {
 297  310                  ASSERT3S(nthreads, >=, 1);
 298  311          }
 299  312  
 300  313          rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
 301  314          mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
 302  315          cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
 303  316          cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
 304  317          cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
 305  318          tq->tq_flags = flags | TASKQ_ACTIVE;
 306  319          tq->tq_active = nthreads;
 307  320          tq->tq_nthreads = nthreads;
 308  321          tq->tq_minalloc = minalloc;
 309  322          tq->tq_maxalloc = maxalloc;
 310  323          tq->tq_task.tqent_next = &tq->tq_task;
 311  324          tq->tq_task.tqent_prev = &tq->tq_task;
 312  325          tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
 313  326  
 314  327          if (flags & TASKQ_PREPOPULATE) {
 315  328                  mutex_enter(&tq->tq_lock);
 316  329                  while (minalloc-- > 0)
 317  330                          task_free(tq, task_alloc(tq, KM_SLEEP));
 318  331                  mutex_exit(&tq->tq_lock);
 319  332          }
 320  333  
 321  334          for (t = 0; t < nthreads; t++)
 322  335                  (void) thr_create(0, 0, taskq_thread,
 323  336                      tq, THR_BOUND, &tq->tq_threadlist[t]);
 324  337  
 325  338          return (tq);
 326  339  }
 327  340  
 328  341  void
 329  342  taskq_destroy(taskq_t *tq)
 330  343  {
 331  344          int t;
 332  345          int nthreads = tq->tq_nthreads;
 333  346  
 334  347          taskq_wait(tq);
 335  348  
 336  349          mutex_enter(&tq->tq_lock);
 337  350  
 338  351          tq->tq_flags &= ~TASKQ_ACTIVE;
 339  352          cv_broadcast(&tq->tq_dispatch_cv);
 340  353  
 341  354          while (tq->tq_nthreads != 0)
 342  355                  cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
 343  356  
 344  357          tq->tq_minalloc = 0;
 345  358          while (tq->tq_nalloc != 0) {
 346  359                  ASSERT(tq->tq_freelist != NULL);
 347  360                  task_free(tq, task_alloc(tq, KM_SLEEP));
 348  361          }
 349  362  
 350  363          mutex_exit(&tq->tq_lock);
 351  364  
 352  365          for (t = 0; t < nthreads; t++)
 353  366                  (void) thr_join(tq->tq_threadlist[t], NULL, NULL);
 354  367  
 355  368          kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
 356  369  
 357  370          rw_destroy(&tq->tq_threadlock);
 358  371          mutex_destroy(&tq->tq_lock);
 359  372          cv_destroy(&tq->tq_dispatch_cv);
 360  373          cv_destroy(&tq->tq_wait_cv);
 361  374          cv_destroy(&tq->tq_maxalloc_cv);
 362  375  
 363  376          kmem_free(tq, sizeof (taskq_t));
 364  377  }
 365  378  
 366  379  int
 367  380  taskq_member(taskq_t *tq, struct _kthread *t)
 368  381  {
 369  382          int i;
 370  383  
 371  384          if (taskq_now)
 372  385                  return (1);
 373  386  
 374  387          for (i = 0; i < tq->tq_nthreads; i++)
 375  388                  if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
 376  389                          return (1);
 377  390  
 378  391          return (0);
 379  392  }
 380  393  
 381  394  void
 382  395  system_taskq_init(void)
 383  396  {
 384  397          system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
 385  398              TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
 386  399  }
 387  400  
 388  401  void
 389  402  system_taskq_fini(void)
 390  403  {
 391  404          taskq_destroy(system_taskq);
 392  405          system_taskq = NULL; /* defensive */
 393  406  }
  
    | 
      ↓ open down ↓ | 
    169 lines elided | 
    
      ↑ open up ↑ | 
  
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX