Print this page
    
OS-6363 system went to dark side of moon for ~467 seconds OS-6404 ARC reclaim should throttle its calls to arc_kmem_reap_now() Reviewed by: Bryan Cantrill <bryan@joyent.com> Reviewed by: Dan McDonald <danmcd@joyent.com>
    
      
        | Split | 
	Close | 
      
      | Expand all | 
      | Collapse all | 
    
    
          --- old/usr/src/lib/libzpool/common/taskq.c
          +++ new/usr/src/lib/libzpool/common/taskq.c
   1    1  /*
   2    2   * CDDL HEADER START
   3    3   *
   4    4   * The contents of this file are subject to the terms of the
   5    5   * Common Development and Distribution License (the "License").
   6    6   * You may not use this file except in compliance with the License.
   7    7   *
   8    8   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9    9   * or http://www.opensolaris.org/os/licensing.
  10   10   * See the License for the specific language governing permissions
  11   11   * and limitations under the License.
  12   12   *
  13   13   * When distributing Covered Code, include this CDDL HEADER in each
  14   14   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15   15   * If applicable, add the following below this CDDL HEADER, with the
  16   16   * fields enclosed by brackets "[]" replaced with your own identifying
  17   17   * information: Portions Copyright [yyyy] [name of copyright owner]
  18   18   *
  
    | 
      ↓ open down ↓ | 
    18 lines elided | 
    
      ↑ open up ↑ | 
  
  19   19   * CDDL HEADER END
  20   20   */
  21   21  /*
  22   22   * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23   23   * Use is subject to license terms.
  24   24   */
  25   25  /*
  26   26   * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
  27   27   * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
  28   28   * Copyright (c) 2014 by Delphix. All rights reserved.
       29 + * Copyright (c) 2017, Joyent, Inc.
  29   30   */
  30   31  
  31   32  #include <sys/zfs_context.h>
  32   33  
  33   34  int taskq_now;
  34   35  taskq_t *system_taskq;
  35   36  
  36   37  #define TASKQ_ACTIVE    0x00010000
  37   38  #define TASKQ_NAMELEN   31
  38   39  
  39   40  struct taskq {
  40   41          char            tq_name[TASKQ_NAMELEN + 1];
  41   42          kmutex_t        tq_lock;
  42   43          krwlock_t       tq_threadlock;
  43   44          kcondvar_t      tq_dispatch_cv;
  44   45          kcondvar_t      tq_wait_cv;
  45   46          thread_t        *tq_threadlist;
  46   47          int             tq_flags;
  47   48          int             tq_active;
  48   49          int             tq_nthreads;
  49   50          int             tq_nalloc;
  50   51          int             tq_minalloc;
  51   52          int             tq_maxalloc;
  52   53          kcondvar_t      tq_maxalloc_cv;
  53   54          int             tq_maxalloc_wait;
  54   55          taskq_ent_t     *tq_freelist;
  55   56          taskq_ent_t     tq_task;
  56   57  };
  57   58  
  58   59  static taskq_ent_t *
  59   60  task_alloc(taskq_t *tq, int tqflags)
  60   61  {
  61   62          taskq_ent_t *t;
  62   63          int rv;
  63   64  
  64   65  again:  if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
  65   66                  tq->tq_freelist = t->tqent_next;
  66   67          } else {
  67   68                  if (tq->tq_nalloc >= tq->tq_maxalloc) {
  68   69                          if (!(tqflags & KM_SLEEP))
  69   70                                  return (NULL);
  70   71  
  71   72                          /*
  72   73                           * We don't want to exceed tq_maxalloc, but we can't
  73   74                           * wait for other tasks to complete (and thus free up
  74   75                           * task structures) without risking deadlock with
  75   76                           * the caller.  So, we just delay for one second
  76   77                           * to throttle the allocation rate. If we have tasks
  77   78                           * complete before one second timeout expires then
  78   79                           * taskq_ent_free will signal us and we will
  79   80                           * immediately retry the allocation.
  80   81                           */
  81   82                          tq->tq_maxalloc_wait++;
  82   83                          rv = cv_timedwait(&tq->tq_maxalloc_cv,
  83   84                              &tq->tq_lock, ddi_get_lbolt() + hz);
  84   85                          tq->tq_maxalloc_wait--;
  85   86                          if (rv > 0)
  86   87                                  goto again;             /* signaled */
  87   88                  }
  88   89                  mutex_exit(&tq->tq_lock);
  89   90  
  90   91                  t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
  91   92  
  92   93                  mutex_enter(&tq->tq_lock);
  93   94                  if (t != NULL)
  94   95                          tq->tq_nalloc++;
  95   96          }
  96   97          return (t);
  97   98  }
  98   99  
  99  100  static void
 100  101  task_free(taskq_t *tq, taskq_ent_t *t)
 101  102  {
 102  103          if (tq->tq_nalloc <= tq->tq_minalloc) {
 103  104                  t->tqent_next = tq->tq_freelist;
 104  105                  tq->tq_freelist = t;
 105  106          } else {
 106  107                  tq->tq_nalloc--;
 107  108                  mutex_exit(&tq->tq_lock);
 108  109                  kmem_free(t, sizeof (taskq_ent_t));
 109  110                  mutex_enter(&tq->tq_lock);
 110  111          }
 111  112  
 112  113          if (tq->tq_maxalloc_wait)
 113  114                  cv_signal(&tq->tq_maxalloc_cv);
 114  115  }
 115  116  
 116  117  taskqid_t
 117  118  taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
 118  119  {
 119  120          taskq_ent_t *t;
 120  121  
 121  122          if (taskq_now) {
 122  123                  func(arg);
 123  124                  return (1);
 124  125          }
 125  126  
 126  127          mutex_enter(&tq->tq_lock);
 127  128          ASSERT(tq->tq_flags & TASKQ_ACTIVE);
 128  129          if ((t = task_alloc(tq, tqflags)) == NULL) {
 129  130                  mutex_exit(&tq->tq_lock);
 130  131                  return (0);
 131  132          }
 132  133          if (tqflags & TQ_FRONT) {
 133  134                  t->tqent_next = tq->tq_task.tqent_next;
 134  135                  t->tqent_prev = &tq->tq_task;
 135  136          } else {
 136  137                  t->tqent_next = &tq->tq_task;
 137  138                  t->tqent_prev = tq->tq_task.tqent_prev;
 138  139          }
 139  140          t->tqent_next->tqent_prev = t;
 140  141          t->tqent_prev->tqent_next = t;
 141  142          t->tqent_func = func;
 142  143          t->tqent_arg = arg;
 143  144          t->tqent_flags = 0;
 144  145          cv_signal(&tq->tq_dispatch_cv);
 145  146          mutex_exit(&tq->tq_lock);
 146  147          return (1);
 147  148  }
 148  149  
 149  150  void
 150  151  taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
 151  152      taskq_ent_t *t)
 152  153  {
 153  154          ASSERT(func != NULL);
 154  155          ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
 155  156  
 156  157          /*
 157  158           * Mark it as a prealloc'd task.  This is important
 158  159           * to ensure that we don't free it later.
 159  160           */
 160  161          t->tqent_flags |= TQENT_FLAG_PREALLOC;
 161  162          /*
 162  163           * Enqueue the task to the underlying queue.
 163  164           */
 164  165          mutex_enter(&tq->tq_lock);
 165  166  
 166  167          if (flags & TQ_FRONT) {
 167  168                  t->tqent_next = tq->tq_task.tqent_next;
 168  169                  t->tqent_prev = &tq->tq_task;
 169  170          } else {
 170  171                  t->tqent_next = &tq->tq_task;
  
    | 
      ↓ open down ↓ | 
    132 lines elided | 
    
      ↑ open up ↑ | 
  
 171  172                  t->tqent_prev = tq->tq_task.tqent_prev;
 172  173          }
 173  174          t->tqent_next->tqent_prev = t;
 174  175          t->tqent_prev->tqent_next = t;
 175  176          t->tqent_func = func;
 176  177          t->tqent_arg = arg;
 177  178          cv_signal(&tq->tq_dispatch_cv);
 178  179          mutex_exit(&tq->tq_lock);
 179  180  }
 180  181  
      182 +boolean_t
      183 +taskq_empty(taskq_t *tq)
      184 +{
      185 +        boolean_t rv;
      186 +
      187 +        mutex_enter(&tq->tq_lock);
      188 +        rv = (tq->tq_task.tqent_next == &tq->tq_task) && (tq->tq_active == 0);
      189 +        mutex_exit(&tq->tq_lock);
      190 +
      191 +        return (rv);
      192 +}
      193 +
 181  194  void
 182  195  taskq_wait(taskq_t *tq)
 183  196  {
 184  197          mutex_enter(&tq->tq_lock);
 185  198          while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
 186  199                  cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
 187  200          mutex_exit(&tq->tq_lock);
 188  201  }
 189  202  
 190  203  static void *
 191  204  taskq_thread(void *arg)
 192  205  {
 193  206          taskq_t *tq = arg;
 194  207          taskq_ent_t *t;
 195  208          boolean_t prealloc;
 196  209  
 197  210          mutex_enter(&tq->tq_lock);
 198  211          while (tq->tq_flags & TASKQ_ACTIVE) {
 199  212                  if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
 200  213                          if (--tq->tq_active == 0)
 201  214                                  cv_broadcast(&tq->tq_wait_cv);
 202  215                          cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
 203  216                          tq->tq_active++;
 204  217                          continue;
 205  218                  }
 206  219                  t->tqent_prev->tqent_next = t->tqent_next;
 207  220                  t->tqent_next->tqent_prev = t->tqent_prev;
 208  221                  t->tqent_next = NULL;
 209  222                  t->tqent_prev = NULL;
 210  223                  prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
 211  224                  mutex_exit(&tq->tq_lock);
 212  225  
 213  226                  rw_enter(&tq->tq_threadlock, RW_READER);
 214  227                  t->tqent_func(t->tqent_arg);
 215  228                  rw_exit(&tq->tq_threadlock);
 216  229  
 217  230                  mutex_enter(&tq->tq_lock);
 218  231                  if (!prealloc)
 219  232                          task_free(tq, t);
 220  233          }
 221  234          tq->tq_nthreads--;
 222  235          cv_broadcast(&tq->tq_wait_cv);
 223  236          mutex_exit(&tq->tq_lock);
 224  237          return (NULL);
 225  238  }
 226  239  
 227  240  /*ARGSUSED*/
 228  241  taskq_t *
 229  242  taskq_create(const char *name, int nthreads, pri_t pri,
 230  243          int minalloc, int maxalloc, uint_t flags)
 231  244  {
 232  245          taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
 233  246          int t;
 234  247  
 235  248          if (flags & TASKQ_THREADS_CPU_PCT) {
 236  249                  int pct;
 237  250                  ASSERT3S(nthreads, >=, 0);
 238  251                  ASSERT3S(nthreads, <=, 100);
 239  252                  pct = MIN(nthreads, 100);
 240  253                  pct = MAX(pct, 0);
 241  254  
 242  255                  nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
 243  256                  nthreads = MAX(nthreads, 1);    /* need at least 1 thread */
 244  257          } else {
 245  258                  ASSERT3S(nthreads, >=, 1);
 246  259          }
 247  260  
 248  261          rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
 249  262          mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
 250  263          cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
 251  264          cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
 252  265          cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
 253  266          (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1);
 254  267          tq->tq_flags = flags | TASKQ_ACTIVE;
 255  268          tq->tq_active = nthreads;
 256  269          tq->tq_nthreads = nthreads;
 257  270          tq->tq_minalloc = minalloc;
 258  271          tq->tq_maxalloc = maxalloc;
 259  272          tq->tq_task.tqent_next = &tq->tq_task;
 260  273          tq->tq_task.tqent_prev = &tq->tq_task;
 261  274          tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
 262  275  
 263  276          if (flags & TASKQ_PREPOPULATE) {
 264  277                  mutex_enter(&tq->tq_lock);
 265  278                  while (minalloc-- > 0)
 266  279                          task_free(tq, task_alloc(tq, KM_SLEEP));
 267  280                  mutex_exit(&tq->tq_lock);
 268  281          }
 269  282  
 270  283          for (t = 0; t < nthreads; t++)
 271  284                  (void) thr_create(0, 0, taskq_thread,
 272  285                      tq, THR_BOUND, &tq->tq_threadlist[t]);
 273  286  
 274  287          return (tq);
 275  288  }
 276  289  
 277  290  void
 278  291  taskq_destroy(taskq_t *tq)
 279  292  {
 280  293          int t;
 281  294          int nthreads = tq->tq_nthreads;
 282  295  
 283  296          taskq_wait(tq);
 284  297  
 285  298          mutex_enter(&tq->tq_lock);
 286  299  
 287  300          tq->tq_flags &= ~TASKQ_ACTIVE;
 288  301          cv_broadcast(&tq->tq_dispatch_cv);
 289  302  
 290  303          while (tq->tq_nthreads != 0)
 291  304                  cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
 292  305  
 293  306          tq->tq_minalloc = 0;
 294  307          while (tq->tq_nalloc != 0) {
 295  308                  ASSERT(tq->tq_freelist != NULL);
 296  309                  task_free(tq, task_alloc(tq, KM_SLEEP));
 297  310          }
 298  311  
 299  312          mutex_exit(&tq->tq_lock);
 300  313  
 301  314          for (t = 0; t < nthreads; t++)
 302  315                  (void) thr_join(tq->tq_threadlist[t], NULL, NULL);
 303  316  
 304  317          kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
 305  318  
 306  319          rw_destroy(&tq->tq_threadlock);
 307  320          mutex_destroy(&tq->tq_lock);
 308  321          cv_destroy(&tq->tq_dispatch_cv);
 309  322          cv_destroy(&tq->tq_wait_cv);
 310  323          cv_destroy(&tq->tq_maxalloc_cv);
 311  324  
 312  325          kmem_free(tq, sizeof (taskq_t));
 313  326  }
 314  327  
 315  328  int
 316  329  taskq_member(taskq_t *tq, void *t)
 317  330  {
 318  331          int i;
 319  332  
 320  333          if (taskq_now)
 321  334                  return (1);
 322  335  
 323  336          for (i = 0; i < tq->tq_nthreads; i++)
 324  337                  if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
 325  338                          return (1);
 326  339  
 327  340          return (0);
 328  341  }
 329  342  
 330  343  void
 331  344  system_taskq_init(void)
 332  345  {
 333  346          system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
 334  347              TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
 335  348  }
 336  349  
 337  350  void
 338  351  system_taskq_fini(void)
 339  352  {
 340  353          taskq_destroy(system_taskq);
 341  354          system_taskq = NULL; /* defensive */
 342  355  }
  
    | 
      ↓ open down ↓ | 
    152 lines elided | 
    
      ↑ open up ↑ | 
  
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX