1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 /*
  26  * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
  27  * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  28  * Copyright (c) 2017, Joyent, Inc.
  29  */
  30 
  31 #include <sys/taskq_impl.h>
  32 
  33 #include <sys/class.h>
  34 #include <sys/debug.h>
  35 #include <sys/ksynch.h>
  36 #include <sys/kmem.h>
  37 #include <sys/time.h>
  38 #include <sys/systm.h>
  39 #include <sys/sysmacros.h>
  40 #include <sys/unistd.h>
  41 
  42 /* avoid <unistd.h> */
  43 extern long sysconf(int);
  44 
  45 /* avoiding <thread.h> */
  46 typedef unsigned int thread_t;
  47 typedef unsigned int thread_key_t;
  48 
  49 extern int thr_create(void *, size_t, void *(*)(void *), void *, long,
  50                         thread_t *);
  51 extern int thr_join(thread_t, thread_t *, void **);
  52 
  53 /*
  54  * POSIX.1c Note:
  55  * THR_BOUND is defined same as PTHREAD_SCOPE_SYSTEM in <pthread.h>
  56  * THR_DETACHED is defined same as PTHREAD_CREATE_DETACHED in <pthread.h>
  57  * Any changes in these definitions should be reflected in <pthread.h>
  58  */
  59 #define THR_BOUND               0x00000001      /* = PTHREAD_SCOPE_SYSTEM */
  60 #define THR_NEW_LWP             0x00000002
  61 #define THR_DETACHED            0x00000040      /* = PTHREAD_CREATE_DETACHED */
  62 #define THR_SUSPENDED           0x00000080
  63 #define THR_DAEMON              0x00000100
  64 
  65 
  66 int taskq_now;
  67 taskq_t *system_taskq;
  68 
  69 #define TASKQ_ACTIVE    0x00010000
  70 
  71 struct taskq {
  72         kmutex_t        tq_lock;
  73         krwlock_t       tq_threadlock;
  74         kcondvar_t      tq_dispatch_cv;
  75         kcondvar_t      tq_wait_cv;
  76         thread_t        *tq_threadlist;
  77         int             tq_flags;
  78         int             tq_active;
  79         int             tq_nthreads;
  80         int             tq_nalloc;
  81         int             tq_minalloc;
  82         int             tq_maxalloc;
  83         kcondvar_t      tq_maxalloc_cv;
  84         int             tq_maxalloc_wait;
  85         taskq_ent_t     *tq_freelist;
  86         taskq_ent_t     tq_task;
  87 };
  88 
  89 static taskq_ent_t *
  90 task_alloc(taskq_t *tq, int tqflags)
  91 {
  92         taskq_ent_t *t;
  93         int rv;
  94 
  95 again:  if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
  96                 tq->tq_freelist = t->tqent_next;
  97         } else {
  98                 if (tq->tq_nalloc >= tq->tq_maxalloc) {
  99                         if (!(tqflags & KM_SLEEP))
 100                                 return (NULL);
 101 
 102                         /*
 103                          * We don't want to exceed tq_maxalloc, but we can't
 104                          * wait for other tasks to complete (and thus free up
 105                          * task structures) without risking deadlock with
 106                          * the caller.  So, we just delay for one second
 107                          * to throttle the allocation rate. If we have tasks
 108                          * complete before one second timeout expires then
 109                          * taskq_ent_free will signal us and we will
 110                          * immediately retry the allocation.
 111                          */
 112                         tq->tq_maxalloc_wait++;
 113                         rv = cv_timedwait(&tq->tq_maxalloc_cv,
 114                             &tq->tq_lock, ddi_get_lbolt() + hz);
 115                         tq->tq_maxalloc_wait--;
 116                         if (rv > 0)
 117                                 goto again;             /* signaled */
 118                 }
 119                 mutex_exit(&tq->tq_lock);
 120 
 121                 t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
 122 
 123                 mutex_enter(&tq->tq_lock);
 124                 if (t != NULL)
 125                         tq->tq_nalloc++;
 126         }
 127         return (t);
 128 }
 129 
 130 static void
 131 task_free(taskq_t *tq, taskq_ent_t *t)
 132 {
 133         if (tq->tq_nalloc <= tq->tq_minalloc) {
 134                 t->tqent_next = tq->tq_freelist;
 135                 tq->tq_freelist = t;
 136         } else {
 137                 tq->tq_nalloc--;
 138                 mutex_exit(&tq->tq_lock);
 139                 kmem_free(t, sizeof (taskq_ent_t));
 140                 mutex_enter(&tq->tq_lock);
 141         }
 142 
 143         if (tq->tq_maxalloc_wait)
 144                 cv_signal(&tq->tq_maxalloc_cv);
 145 }
 146 
 147 taskqid_t
 148 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
 149 {
 150         taskq_ent_t *t;
 151 
 152         if (taskq_now) {
 153                 func(arg);
 154                 return (1);
 155         }
 156 
 157         mutex_enter(&tq->tq_lock);
 158         ASSERT(tq->tq_flags & TASKQ_ACTIVE);
 159         if ((t = task_alloc(tq, tqflags)) == NULL) {
 160                 mutex_exit(&tq->tq_lock);
 161                 return (0);
 162         }
 163         if (tqflags & TQ_FRONT) {
 164                 t->tqent_next = tq->tq_task.tqent_next;
 165                 t->tqent_prev = &tq->tq_task;
 166         } else {
 167                 t->tqent_next = &tq->tq_task;
 168                 t->tqent_prev = tq->tq_task.tqent_prev;
 169         }
 170         t->tqent_next->tqent_prev = t;
 171         t->tqent_prev->tqent_next = t;
 172         t->tqent_func = func;
 173         t->tqent_arg = arg;
 174         t->tqent_flags = 0;
 175         cv_signal(&tq->tq_dispatch_cv);
 176         mutex_exit(&tq->tq_lock);
 177         return (1);
 178 }
 179 
 180 void
 181 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
 182     taskq_ent_t *t)
 183 {
 184         ASSERT(func != NULL);
 185         ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
 186 
 187         /*
 188          * Mark it as a prealloc'd task.  This is important
 189          * to ensure that we don't free it later.
 190          */
 191         t->tqent_flags |= TQENT_FLAG_PREALLOC;
 192         /*
 193          * Enqueue the task to the underlying queue.
 194          */
 195         mutex_enter(&tq->tq_lock);
 196 
 197         if (flags & TQ_FRONT) {
 198                 t->tqent_next = tq->tq_task.tqent_next;
 199                 t->tqent_prev = &tq->tq_task;
 200         } else {
 201                 t->tqent_next = &tq->tq_task;
 202                 t->tqent_prev = tq->tq_task.tqent_prev;
 203         }
 204         t->tqent_next->tqent_prev = t;
 205         t->tqent_prev->tqent_next = t;
 206         t->tqent_func = func;
 207         t->tqent_arg = arg;
 208         cv_signal(&tq->tq_dispatch_cv);
 209         mutex_exit(&tq->tq_lock);
 210 }
 211 
 212 boolean_t
 213 taskq_empty(taskq_t *tq)
 214 {
 215         boolean_t rv;
 216 
 217         mutex_enter(&tq->tq_lock);
 218         rv = (tq->tq_task.tqent_next == &tq->tq_task) && (tq->tq_active == 0);
 219         mutex_exit(&tq->tq_lock);
 220 
 221         return (rv);
 222 }
 223 
 224 void
 225 taskq_wait(taskq_t *tq)
 226 {
 227         mutex_enter(&tq->tq_lock);
 228         while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
 229                 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
 230         mutex_exit(&tq->tq_lock);
 231 }
 232 
 233 static void *
 234 taskq_thread(void *arg)
 235 {
 236         taskq_t *tq = arg;
 237         taskq_ent_t *t;
 238         boolean_t prealloc;
 239 
 240         mutex_enter(&tq->tq_lock);
 241         while (tq->tq_flags & TASKQ_ACTIVE) {
 242                 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
 243                         if (--tq->tq_active == 0)
 244                                 cv_broadcast(&tq->tq_wait_cv);
 245                         cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
 246                         tq->tq_active++;
 247                         continue;
 248                 }
 249                 t->tqent_prev->tqent_next = t->tqent_next;
 250                 t->tqent_next->tqent_prev = t->tqent_prev;
 251                 t->tqent_next = NULL;
 252                 t->tqent_prev = NULL;
 253                 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
 254                 mutex_exit(&tq->tq_lock);
 255 
 256                 rw_enter(&tq->tq_threadlock, RW_READER);
 257                 t->tqent_func(t->tqent_arg);
 258                 rw_exit(&tq->tq_threadlock);
 259 
 260                 mutex_enter(&tq->tq_lock);
 261                 if (!prealloc)
 262                         task_free(tq, t);
 263         }
 264         tq->tq_nthreads--;
 265         cv_broadcast(&tq->tq_wait_cv);
 266         mutex_exit(&tq->tq_lock);
 267         return (NULL);
 268 }
 269 
 270 /*ARGSUSED*/
 271 taskq_t *
 272 taskq_create(const char *name, int nthr, pri_t pri, int minalloc,
 273     int maxalloc, uint_t flags)
 274 {
 275         return (taskq_create_proc(name, nthr, pri,
 276             minalloc, maxalloc, NULL, flags));
 277 }
 278 
 279 /*ARGSUSED*/
 280 taskq_t *
 281 taskq_create_proc(const char *name, int nthreads, pri_t pri,
 282         int minalloc, int maxalloc, proc_t *proc, uint_t flags)
 283 {
 284         taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
 285         int t;
 286 
 287         if (flags & TASKQ_THREADS_CPU_PCT) {
 288                 int pct;
 289                 ASSERT3S(nthreads, >=, 0);
 290                 ASSERT3S(nthreads, <=, 100);
 291                 pct = MIN(nthreads, 100);
 292                 pct = MAX(pct, 0);
 293 
 294                 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
 295                 nthreads = MAX(nthreads, 1);    /* need at least 1 thread */
 296         } else {
 297                 ASSERT3S(nthreads, >=, 1);
 298         }
 299 
 300         rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
 301         mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
 302         cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
 303         cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
 304         cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
 305         tq->tq_flags = flags | TASKQ_ACTIVE;
 306         tq->tq_active = nthreads;
 307         tq->tq_nthreads = nthreads;
 308         tq->tq_minalloc = minalloc;
 309         tq->tq_maxalloc = maxalloc;
 310         tq->tq_task.tqent_next = &tq->tq_task;
 311         tq->tq_task.tqent_prev = &tq->tq_task;
 312         tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
 313 
 314         if (flags & TASKQ_PREPOPULATE) {
 315                 mutex_enter(&tq->tq_lock);
 316                 while (minalloc-- > 0)
 317                         task_free(tq, task_alloc(tq, KM_SLEEP));
 318                 mutex_exit(&tq->tq_lock);
 319         }
 320 
 321         for (t = 0; t < nthreads; t++)
 322                 (void) thr_create(0, 0, taskq_thread,
 323                     tq, THR_BOUND, &tq->tq_threadlist[t]);
 324 
 325         return (tq);
 326 }
 327 
 328 void
 329 taskq_destroy(taskq_t *tq)
 330 {
 331         int t;
 332         int nthreads = tq->tq_nthreads;
 333 
 334         taskq_wait(tq);
 335 
 336         mutex_enter(&tq->tq_lock);
 337 
 338         tq->tq_flags &= ~TASKQ_ACTIVE;
 339         cv_broadcast(&tq->tq_dispatch_cv);
 340 
 341         while (tq->tq_nthreads != 0)
 342                 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
 343 
 344         tq->tq_minalloc = 0;
 345         while (tq->tq_nalloc != 0) {
 346                 ASSERT(tq->tq_freelist != NULL);
 347                 task_free(tq, task_alloc(tq, KM_SLEEP));
 348         }
 349 
 350         mutex_exit(&tq->tq_lock);
 351 
 352         for (t = 0; t < nthreads; t++)
 353                 (void) thr_join(tq->tq_threadlist[t], NULL, NULL);
 354 
 355         kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
 356 
 357         rw_destroy(&tq->tq_threadlock);
 358         mutex_destroy(&tq->tq_lock);
 359         cv_destroy(&tq->tq_dispatch_cv);
 360         cv_destroy(&tq->tq_wait_cv);
 361         cv_destroy(&tq->tq_maxalloc_cv);
 362 
 363         kmem_free(tq, sizeof (taskq_t));
 364 }
 365 
 366 int
 367 taskq_member(taskq_t *tq, struct _kthread *t)
 368 {
 369         int i;
 370 
 371         if (taskq_now)
 372                 return (1);
 373 
 374         for (i = 0; i < tq->tq_nthreads; i++)
 375                 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
 376                         return (1);
 377 
 378         return (0);
 379 }
 380 
 381 void
 382 system_taskq_init(void)
 383 {
 384         system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
 385             TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
 386 }
 387 
 388 void
 389 system_taskq_fini(void)
 390 {
 391         taskq_destroy(system_taskq);
 392         system_taskq = NULL; /* defensive */
 393 }