1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 
  22 /*
  23  * Copyright 2015 Nexenta Systems, Inc.  All rights reserved.
  24  */
  25 
  26 /*
  27  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  28  * Use is subject to license terms.
  29  */
  30 
  31 /*
  32  * Copyright 1993 OpenVision Technologies, Inc., All Rights Reserved.
  33  */
  34 
  35 /*      Copyright (c) 1983, 1984, 1985,  1986, 1987, 1988, 1989 AT&T        */
  36 /*        All Rights Reserved   */
  37 
  38 /*
  39  * Portions of this source code were derived from Berkeley 4.3 BSD
  40  * under license from the Regents of the University of California.
  41  */
  42 
  43 /*
  44  * Server-side remote procedure call interface.
  45  *
  46  * Master transport handle (SVCMASTERXPRT).
  47  *   The master transport handle structure is shared among service
  48  *   threads processing events on the transport. Some fields in the
  49  *   master structure are protected by locks
  50  *   - xp_req_lock protects the request queue:
  51  *      xp_req_head, xp_req_tail, xp_reqs, xp_size, xp_full, xp_enable
  52  *   - xp_thread_lock protects the thread (clone) counts
  53  *      xp_threads, xp_detached_threads, xp_wq
  54  *   Each master transport is registered to exactly one thread pool.
  55  *
  56  * Clone transport handle (SVCXPRT)
  57  *   The clone transport handle structure is a per-service-thread handle
  58  *   to the transport. The structure carries all the fields/buffers used
  59  *   for request processing. A service thread or, in other words, a clone
  60  *   structure, can be linked to an arbitrary master structure to process
  61  *   requests on this transport. The master handle keeps track of reference
  62  *   counts of threads (clones) linked to it. A service thread can switch
  63  *   to another transport by unlinking its clone handle from the current
  64  *   transport and linking to a new one. Switching is relatively inexpensive
  65  *   but it involves locking (master's xprt->xp_thread_lock).
  66  *
  67  * Pools.
  68  *   A pool represents a kernel RPC service (NFS, Lock Manager, etc.).
  69  *   Transports related to the service are registered to the service pool.
  70  *   Service threads can switch between different transports in the pool.
  71  *   Thus, each service has its own pool of service threads. The maximum
  72  *   number of threads in a pool is pool->p_maxthreads. This limit allows
  73  *   to restrict resource usage by the service. Some fields are protected
  74  *   by locks:
  75  *   - p_req_lock protects several counts and flags:
  76  *      p_reqs, p_size, p_walkers, p_asleep, p_drowsy, p_req_cv
  77  *   - p_thread_lock governs other thread counts:
  78  *      p_threads, p_detached_threads, p_reserved_threads, p_closing
  79  *
  80  *   In addition, each pool contains a doubly-linked list of transports,
  81  *   an `xprt-ready' queue and a creator thread (see below). Threads in
  82  *   the pool share some other parameters such as stack size and
  83  *   polling timeout.
  84  *
  85  *   Pools are initialized through the svc_pool_create() function called from
  86  *   the nfssys() system call. However, thread creation must be done by
  87  *   the userland agent. This is done by using SVCPOOL_WAIT and
  88  *   SVCPOOL_RUN arguments to nfssys(), which call svc_wait() and
  89  *   svc_do_run(), respectively. Once the pool has been initialized,
  90  *   the userland process must set up a 'creator' thread. This thread
  91  *   should park itself in the kernel by calling svc_wait(). If
  92  *   svc_wait() returns successfully, it should fork off a new worker
  93  *   thread, which then calls svc_do_run() in order to get work. When
  94  *   that thread is complete, svc_do_run() will return, and the user
  95  *   program should call thr_exit().
  96  *
  97  *   When we try to register a new pool and there is an old pool with
  98  *   the same id in the doubly linked pool list (this happens when we kill
  99  *   and restart nfsd or lockd), then we unlink the old pool from the list
 100  *   and mark its state as `closing'. After that the transports can still
 101  *   process requests but new transports won't be registered. When all the
 102  *   transports and service threads associated with the pool are gone the
 103  *   creator thread (see below) will clean up the pool structure and exit.
 104  *
 105  * svc_queuereq() and svc_run().
 106  *   The kernel RPC server is interrupt driven. The svc_queuereq() interrupt
 107  *   routine is called to deliver an RPC request. The service threads
 108  *   loop in svc_run(). The interrupt function queues a request on the
 109  *   transport's queue and it makes sure that the request is serviced.
 110  *   It may either wake up one of sleeping threads, or ask for a new thread
 111  *   to be created, or, if the previous request is just being picked up, do
 112  *   nothing. In the last case the service thread that is picking up the
 113  *   previous request will wake up or create the next thread. After a service
 114  *   thread processes a request and sends a reply it returns to svc_run()
 115  *   and svc_run() calls svc_poll() to find new input.
 116  *
 117  * svc_poll().
 118  *   In order to avoid unnecessary locking, which causes performance
 119  *   problems, we always look for a pending request on the current transport.
 120  *   If there is none we take a hint from the pool's `xprt-ready' queue.
 121  *   If the queue had an overflow we switch to the `drain' mode checking
 122  *   each transport  in the pool's transport list. Once we find a
 123  *   master transport handle with a pending request we latch the request
 124  *   lock on this transport and return to svc_run(). If the request
 125  *   belongs to a transport different than the one the service thread is
 126  *   linked to we need to unlink and link again.
 127  *
 128  *   A service thread goes asleep when there are no pending
 129  *   requests on the transports registered on the pool's transports.
 130  *   All the pool's threads sleep on the same condition variable.
 131  *   If a thread has been sleeping for too long period of time
 132  *   (by default 5 seconds) it wakes up and exits.  Also when a transport
 133  *   is closing sleeping threads wake up to unlink from this transport.
 134  *
 135  * The `xprt-ready' queue.
 136  *   If a service thread finds no request on a transport it is currently linked
 137  *   to it will find another transport with a pending request. To make
 138  *   this search more efficient each pool has an `xprt-ready' queue.
 139  *   The queue is a FIFO. When the interrupt routine queues a request it also
 140  *   inserts a pointer to the transport into the `xprt-ready' queue. A
 141  *   thread looking for a transport with a pending request can pop up a
 142  *   transport and check for a request. The request can be already gone
 143  *   since it could be taken by a thread linked to that transport. In such a
 144  *   case we try the next hint. The `xprt-ready' queue has fixed size (by
 145  *   default 256 nodes). If it overflows svc_poll() has to switch to the
 146  *   less efficient but safe `drain' mode and walk through the pool's
 147  *   transport list.
 148  *
 149  *   Both the svc_poll() loop and the `xprt-ready' queue are optimized
 150  *   for the peak load case that is for the situation when the queue is not
 151  *   empty, there are all the time few pending requests, and a service
 152  *   thread which has just processed a request does not go asleep but picks
 153  *   up immediately the next request.
 154  *
 155  * Thread creator.
 156  *   Each pool has a thread creator associated with it. The creator thread
 157  *   sleeps on a condition variable and waits for a signal to create a
 158  *   service thread. The actual thread creation is done in userland by
 159  *   the method described in "Pools" above.
 160  *
 161  *   Signaling threads should turn on the `creator signaled' flag, and
 162  *   can avoid sending signals when the flag is on. The flag is cleared
 163  *   when the thread is created.
 164  *
 165  *   When the pool is in closing state (ie it has been already unregistered
 166  *   from the pool list) the last thread on the last transport in the pool
 167  *   should turn the p_creator_exit flag on. The creator thread will
 168  *   clean up the pool structure and exit.
 169  *
 170  * Thread reservation; Detaching service threads.
 171  *   A service thread can detach itself to block for an extended amount
 172  *   of time. However, to keep the service active we need to guarantee
 173  *   at least pool->p_redline non-detached threads that can process incoming
 174  *   requests. This, the maximum number of detached and reserved threads is
 175  *   p->p_maxthreads - p->p_redline. A service thread should first acquire
 176  *   a reservation, and if the reservation was granted it can detach itself.
 177  *   If a reservation was granted but the thread does not detach itself
 178  *   it should cancel the reservation before it returns to svc_run().
 179  */
 180 
 181 #include <sys/param.h>
 182 #include <sys/types.h>
 183 #include <rpc/types.h>
 184 #include <sys/socket.h>
 185 #include <sys/time.h>
 186 #include <sys/tiuser.h>
 187 #include <sys/t_kuser.h>
 188 #include <netinet/in.h>
 189 #include <rpc/xdr.h>
 190 #include <rpc/auth.h>
 191 #include <rpc/clnt.h>
 192 #include <rpc/rpc_msg.h>
 193 #include <rpc/svc.h>
 194 #include <sys/proc.h>
 195 #include <sys/user.h>
 196 #include <sys/stream.h>
 197 #include <sys/strsubr.h>
 198 #include <sys/strsun.h>
 199 #include <sys/tihdr.h>
 200 #include <sys/debug.h>
 201 #include <sys/cmn_err.h>
 202 #include <sys/file.h>
 203 #include <sys/systm.h>
 204 #include <sys/callb.h>
 205 #include <sys/vtrace.h>
 206 #include <sys/zone.h>
 207 #include <nfs/nfs.h>
 208 #include <sys/tsol/label_macro.h>
 209 
 210 /*
 211  * Defines for svc_poll()
 212  */
 213 #define SVC_EXPRTGONE ((SVCMASTERXPRT *)1)      /* Transport is closing */
 214 #define SVC_ETIMEDOUT ((SVCMASTERXPRT *)2)      /* Timeout */
 215 #define SVC_EINTR ((SVCMASTERXPRT *)3)          /* Interrupted by signal */
 216 
 217 /*
 218  * Default stack size for service threads.
 219  */
 220 #define DEFAULT_SVC_RUN_STKSIZE         (0)     /* default kernel stack */
 221 
 222 int    svc_default_stksize = DEFAULT_SVC_RUN_STKSIZE;
 223 
 224 /*
 225  * Default polling timeout for service threads.
 226  * Multiplied by hz when used.
 227  */
 228 #define DEFAULT_SVC_POLL_TIMEOUT        (5)     /* seconds */
 229 
 230 clock_t svc_default_timeout = DEFAULT_SVC_POLL_TIMEOUT;
 231 
 232 /*
 233  * Size of the `xprt-ready' queue.
 234  */
 235 #define DEFAULT_SVC_QSIZE               (256)   /* qnodes */
 236 
 237 size_t svc_default_qsize = DEFAULT_SVC_QSIZE;
 238 
 239 /*
 240  * Default limit for the number of service threads.
 241  */
 242 #define DEFAULT_SVC_MAXTHREADS          (INT16_MAX)
 243 
 244 int    svc_default_maxthreads = DEFAULT_SVC_MAXTHREADS;
 245 
 246 /*
 247  * Maximum number of requests from the same transport (in `drain' mode).
 248  */
 249 #define DEFAULT_SVC_MAX_SAME_XPRT       (8)
 250 
 251 int    svc_default_max_same_xprt = DEFAULT_SVC_MAX_SAME_XPRT;
 252 
 253 
 254 /*
 255  * Default `Redline' of non-detached threads.
 256  * Total number of detached and reserved threads in an RPC server
 257  * thread pool is limited to pool->p_maxthreads - svc_redline.
 258  */
 259 #define DEFAULT_SVC_REDLINE             (1)
 260 
 261 int    svc_default_redline = DEFAULT_SVC_REDLINE;
 262 
 263 /*
 264  * A node for the `xprt-ready' queue.
 265  * See below.
 266  */
 267 struct __svcxprt_qnode {
 268         __SVCXPRT_QNODE *q_next;
 269         SVCMASTERXPRT   *q_xprt;
 270 };
 271 
 272 /*
 273  * Global SVC variables (private).
 274  */
 275 struct svc_globals {
 276         SVCPOOL         *svc_pools;
 277         kmutex_t        svc_plock;
 278 };
 279 
 280 /*
 281  * Debug variable to check for rdma based
 282  * transport startup and cleanup. Contorlled
 283  * through /etc/system. Off by default.
 284  */
 285 int rdma_check = 0;
 286 
 287 /*
 288  * This allows disabling flow control in svc_queuereq().
 289  */
 290 volatile int svc_flowcontrol_disable = 0;
 291 
 292 /*
 293  * Authentication parameters list.
 294  */
 295 static caddr_t rqcred_head;
 296 static kmutex_t rqcred_lock;
 297 
 298 /*
 299  * Pointers to transport specific `rele' routines in rpcmod (set from rpcmod).
 300  */
 301 void    (*rpc_rele)(queue_t *, mblk_t *, bool_t) = NULL;
 302 void    (*mir_rele)(queue_t *, mblk_t *, bool_t) = NULL;
 303 
 304 /* ARGSUSED */
 305 void
 306 rpc_rdma_rele(queue_t *q, mblk_t *mp, bool_t enable)
 307 {
 308 }
 309 void    (*rdma_rele)(queue_t *, mblk_t *, bool_t) = rpc_rdma_rele;
 310 
 311 
 312 /*
 313  * This macro picks which `rele' routine to use, based on the transport type.
 314  */
 315 #define RELE_PROC(xprt) \
 316         ((xprt)->xp_type == T_RDMA ? rdma_rele : \
 317         (((xprt)->xp_type == T_CLTS) ? rpc_rele : mir_rele))
 318 
 319 /*
 320  * If true, then keep quiet about version mismatch.
 321  * This macro is for broadcast RPC only. We have no broadcast RPC in
 322  * kernel now but one may define a flag in the transport structure
 323  * and redefine this macro.
 324  */
 325 #define version_keepquiet(xprt) (FALSE)
 326 
 327 /*
 328  * ZSD key used to retrieve zone-specific svc globals
 329  */
 330 static zone_key_t svc_zone_key;
 331 
 332 static void svc_callout_free(SVCMASTERXPRT *);
 333 static void svc_xprt_qinit(SVCPOOL *, size_t);
 334 static void svc_xprt_qdestroy(SVCPOOL *);
 335 static void svc_thread_creator(SVCPOOL *);
 336 static void svc_creator_signal(SVCPOOL *);
 337 static void svc_creator_signalexit(SVCPOOL *);
 338 static void svc_pool_unregister(struct svc_globals *, SVCPOOL *);
 339 static int svc_run(SVCPOOL *);
 340 
 341 /* ARGSUSED */
 342 static void *
 343 svc_zoneinit(zoneid_t zoneid)
 344 {
 345         struct svc_globals *svc;
 346 
 347         svc = kmem_alloc(sizeof (*svc), KM_SLEEP);
 348         mutex_init(&svc->svc_plock, NULL, MUTEX_DEFAULT, NULL);
 349         svc->svc_pools = NULL;
 350         return (svc);
 351 }
 352 
 353 /* ARGSUSED */
 354 static void
 355 svc_zoneshutdown(zoneid_t zoneid, void *arg)
 356 {
 357         struct svc_globals *svc = arg;
 358         SVCPOOL *pool;
 359 
 360         mutex_enter(&svc->svc_plock);
 361         while ((pool = svc->svc_pools) != NULL) {
 362                 svc_pool_unregister(svc, pool);
 363         }
 364         mutex_exit(&svc->svc_plock);
 365 }
 366 
 367 /* ARGSUSED */
 368 static void
 369 svc_zonefini(zoneid_t zoneid, void *arg)
 370 {
 371         struct svc_globals *svc = arg;
 372 
 373         ASSERT(svc->svc_pools == NULL);
 374         mutex_destroy(&svc->svc_plock);
 375         kmem_free(svc, sizeof (*svc));
 376 }
 377 
 378 /*
 379  * Global SVC init routine.
 380  * Initialize global generic and transport type specific structures
 381  * used by the kernel RPC server side. This routine is called only
 382  * once when the module is being loaded.
 383  */
 384 void
 385 svc_init()
 386 {
 387         zone_key_create(&svc_zone_key, svc_zoneinit, svc_zoneshutdown,
 388             svc_zonefini);
 389         svc_cots_init();
 390         svc_clts_init();
 391 }
 392 
 393 /*
 394  * Destroy the SVCPOOL structure.
 395  */
 396 static void
 397 svc_pool_cleanup(SVCPOOL *pool)
 398 {
 399         ASSERT(pool->p_threads + pool->p_detached_threads == 0);
 400         ASSERT(pool->p_lcount == 0);
 401         ASSERT(pool->p_closing);
 402 
 403         /*
 404          * Call the user supplied shutdown function.  This is done
 405          * here so the user of the pool will be able to cleanup
 406          * service related resources.
 407          */
 408         if (pool->p_shutdown != NULL)
 409                 (pool->p_shutdown)();
 410 
 411         /* Destroy `xprt-ready' queue */
 412         svc_xprt_qdestroy(pool);
 413 
 414         /* Destroy transport list */
 415         rw_destroy(&pool->p_lrwlock);
 416 
 417         /* Destroy locks and condition variables */
 418         mutex_destroy(&pool->p_thread_lock);
 419         mutex_destroy(&pool->p_req_lock);
 420         cv_destroy(&pool->p_req_cv);
 421 
 422         /* Destroy creator's locks and condition variables */
 423         mutex_destroy(&pool->p_creator_lock);
 424         cv_destroy(&pool->p_creator_cv);
 425         mutex_destroy(&pool->p_user_lock);
 426         cv_destroy(&pool->p_user_cv);
 427 
 428         /* Free pool structure */
 429         kmem_free(pool, sizeof (SVCPOOL));
 430 }
 431 
 432 /*
 433  * If all the transports and service threads are already gone
 434  * signal the creator thread to clean up and exit.
 435  */
 436 static bool_t
 437 svc_pool_tryexit(SVCPOOL *pool)
 438 {
 439         ASSERT(MUTEX_HELD(&pool->p_thread_lock));
 440         ASSERT(pool->p_closing);
 441 
 442         if (pool->p_threads + pool->p_detached_threads == 0) {
 443                 rw_enter(&pool->p_lrwlock, RW_READER);
 444                 if (pool->p_lcount == 0) {
 445                         /*
 446                          * Release the locks before sending a signal.
 447                          */
 448                         rw_exit(&pool->p_lrwlock);
 449                         mutex_exit(&pool->p_thread_lock);
 450 
 451                         /*
 452                          * Notify the creator thread to clean up and exit
 453                          *
 454                          * NOTICE: No references to the pool beyond this point!
 455                          *                 The pool is being destroyed.
 456                          */
 457                         ASSERT(!MUTEX_HELD(&pool->p_thread_lock));
 458                         svc_creator_signalexit(pool);
 459 
 460                         return (TRUE);
 461                 }
 462                 rw_exit(&pool->p_lrwlock);
 463         }
 464 
 465         ASSERT(MUTEX_HELD(&pool->p_thread_lock));
 466         return (FALSE);
 467 }
 468 
 469 /*
 470  * Find a pool with a given id.
 471  */
 472 static SVCPOOL *
 473 svc_pool_find(struct svc_globals *svc, int id)
 474 {
 475         SVCPOOL *pool;
 476 
 477         ASSERT(MUTEX_HELD(&svc->svc_plock));
 478 
 479         /*
 480          * Search the list for a pool with a matching id
 481          * and register the transport handle with that pool.
 482          */
 483         for (pool = svc->svc_pools; pool; pool = pool->p_next)
 484                 if (pool->p_id == id)
 485                         return (pool);
 486 
 487         return (NULL);
 488 }
 489 
 490 /*
 491  * PSARC 2003/523 Contract Private Interface
 492  * svc_do_run
 493  * Changes must be reviewed by Solaris File Sharing
 494  * Changes must be communicated to contract-2003-523@sun.com
 495  */
 496 int
 497 svc_do_run(int id)
 498 {
 499         SVCPOOL *pool;
 500         int err = 0;
 501         struct svc_globals *svc;
 502 
 503         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
 504         mutex_enter(&svc->svc_plock);
 505 
 506         pool = svc_pool_find(svc, id);
 507 
 508         mutex_exit(&svc->svc_plock);
 509 
 510         if (pool == NULL)
 511                 return (ENOENT);
 512 
 513         /*
 514          * Increment counter of pool threads now
 515          * that a thread has been created.
 516          */
 517         mutex_enter(&pool->p_thread_lock);
 518         pool->p_threads++;
 519         mutex_exit(&pool->p_thread_lock);
 520 
 521         /* Give work to the new thread. */
 522         err = svc_run(pool);
 523 
 524         return (err);
 525 }
 526 
 527 /*
 528  * Unregister a pool from the pool list.
 529  * Set the closing state. If all the transports and service threads
 530  * are already gone signal the creator thread to clean up and exit.
 531  */
 532 static void
 533 svc_pool_unregister(struct svc_globals *svc, SVCPOOL *pool)
 534 {
 535         SVCPOOL *next = pool->p_next;
 536         SVCPOOL *prev = pool->p_prev;
 537 
 538         ASSERT(MUTEX_HELD(&svc->svc_plock));
 539 
 540         /* Remove from the list */
 541         if (pool == svc->svc_pools)
 542                 svc->svc_pools = next;
 543         if (next)
 544                 next->p_prev = prev;
 545         if (prev)
 546                 prev->p_next = next;
 547         pool->p_next = pool->p_prev = NULL;
 548 
 549         /*
 550          * Offline the pool. Mark the pool as closing.
 551          * If there are no transports in this pool notify
 552          * the creator thread to clean it up and exit.
 553          */
 554         mutex_enter(&pool->p_thread_lock);
 555         if (pool->p_offline != NULL)
 556                 (pool->p_offline)();
 557         pool->p_closing = TRUE;
 558         if (svc_pool_tryexit(pool))
 559                 return;
 560         mutex_exit(&pool->p_thread_lock);
 561 }
 562 
 563 /*
 564  * Register a pool with a given id in the global doubly linked pool list.
 565  * - if there is a pool with the same id in the list then unregister it
 566  * - insert the new pool into the list.
 567  */
 568 static void
 569 svc_pool_register(struct svc_globals *svc, SVCPOOL *pool, int id)
 570 {
 571         SVCPOOL *old_pool;
 572 
 573         /*
 574          * If there is a pool with the same id then remove it from
 575          * the list and mark the pool as closing.
 576          */
 577         mutex_enter(&svc->svc_plock);
 578 
 579         if (old_pool = svc_pool_find(svc, id))
 580                 svc_pool_unregister(svc, old_pool);
 581 
 582         /* Insert into the doubly linked list */
 583         pool->p_id = id;
 584         pool->p_next = svc->svc_pools;
 585         pool->p_prev = NULL;
 586         if (svc->svc_pools)
 587                 svc->svc_pools->p_prev = pool;
 588         svc->svc_pools = pool;
 589 
 590         mutex_exit(&svc->svc_plock);
 591 }
 592 
 593 /*
 594  * Initialize a newly created pool structure
 595  */
 596 static int
 597 svc_pool_init(SVCPOOL *pool, uint_t maxthreads, uint_t redline,
 598         uint_t qsize, uint_t timeout, uint_t stksize, uint_t max_same_xprt)
 599 {
 600         klwp_t *lwp = ttolwp(curthread);
 601 
 602         ASSERT(pool);
 603 
 604         if (maxthreads == 0)
 605                 maxthreads = svc_default_maxthreads;
 606         if (redline == 0)
 607                 redline = svc_default_redline;
 608         if (qsize == 0)
 609                 qsize = svc_default_qsize;
 610         if (timeout == 0)
 611                 timeout = svc_default_timeout;
 612         if (stksize == 0)
 613                 stksize = svc_default_stksize;
 614         if (max_same_xprt == 0)
 615                 max_same_xprt = svc_default_max_same_xprt;
 616 
 617         if (maxthreads < redline)
 618                 return (EINVAL);
 619 
 620         /* Allocate and initialize the `xprt-ready' queue */
 621         svc_xprt_qinit(pool, qsize);
 622 
 623         /* Initialize doubly-linked xprt list */
 624         rw_init(&pool->p_lrwlock, NULL, RW_DEFAULT, NULL);
 625 
 626         /*
 627          * Setting lwp_childstksz on the current lwp so that
 628          * descendants of this lwp get the modified stacksize, if
 629          * it is defined. It is important that either this lwp or
 630          * one of its descendants do the actual servicepool thread
 631          * creation to maintain the stacksize inheritance.
 632          */
 633         if (lwp != NULL)
 634                 lwp->lwp_childstksz = stksize;
 635 
 636         /* Initialize thread limits, locks and condition variables */
 637         pool->p_maxthreads = maxthreads;
 638         pool->p_redline = redline;
 639         pool->p_timeout = timeout * hz;
 640         pool->p_stksize = stksize;
 641         pool->p_max_same_xprt = max_same_xprt;
 642         mutex_init(&pool->p_thread_lock, NULL, MUTEX_DEFAULT, NULL);
 643         mutex_init(&pool->p_req_lock, NULL, MUTEX_DEFAULT, NULL);
 644         cv_init(&pool->p_req_cv, NULL, CV_DEFAULT, NULL);
 645 
 646         /* Initialize userland creator */
 647         pool->p_user_exit = FALSE;
 648         pool->p_signal_create_thread = FALSE;
 649         pool->p_user_waiting = FALSE;
 650         mutex_init(&pool->p_user_lock, NULL, MUTEX_DEFAULT, NULL);
 651         cv_init(&pool->p_user_cv, NULL, CV_DEFAULT, NULL);
 652 
 653         /* Initialize the creator and start the creator thread */
 654         pool->p_creator_exit = FALSE;
 655         mutex_init(&pool->p_creator_lock, NULL, MUTEX_DEFAULT, NULL);
 656         cv_init(&pool->p_creator_cv, NULL, CV_DEFAULT, NULL);
 657 
 658         (void) zthread_create(NULL, pool->p_stksize, svc_thread_creator,
 659             pool, 0, minclsyspri);
 660 
 661         return (0);
 662 }
 663 
 664 /*
 665  * PSARC 2003/523 Contract Private Interface
 666  * svc_pool_create
 667  * Changes must be reviewed by Solaris File Sharing
 668  * Changes must be communicated to contract-2003-523@sun.com
 669  *
 670  * Create an kernel RPC server-side thread/transport pool.
 671  *
 672  * This is public interface for creation of a server RPC thread pool
 673  * for a given service provider. Transports registered with the pool's id
 674  * will be served by a pool's threads. This function is called from the
 675  * nfssys() system call.
 676  */
 677 int
 678 svc_pool_create(struct svcpool_args *args)
 679 {
 680         SVCPOOL *pool;
 681         int error;
 682         struct svc_globals *svc;
 683 
 684         /*
 685          * Caller should check credentials in a way appropriate
 686          * in the context of the call.
 687          */
 688 
 689         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
 690         /* Allocate a new pool */
 691         pool = kmem_zalloc(sizeof (SVCPOOL), KM_SLEEP);
 692 
 693         /*
 694          * Initialize the pool structure and create a creator thread.
 695          */
 696         error = svc_pool_init(pool, args->maxthreads, args->redline,
 697             args->qsize, args->timeout, args->stksize, args->max_same_xprt);
 698 
 699         if (error) {
 700                 kmem_free(pool, sizeof (SVCPOOL));
 701                 return (error);
 702         }
 703 
 704         /* Register the pool with the global pool list */
 705         svc_pool_register(svc, pool, args->id);
 706 
 707         return (0);
 708 }
 709 
 710 int
 711 svc_pool_control(int id, int cmd, void *arg)
 712 {
 713         SVCPOOL *pool;
 714         struct svc_globals *svc;
 715 
 716         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
 717 
 718         switch (cmd) {
 719         case SVCPSET_SHUTDOWN_PROC:
 720                 /*
 721                  * Search the list for a pool with a matching id
 722                  * and register the transport handle with that pool.
 723                  */
 724                 mutex_enter(&svc->svc_plock);
 725 
 726                 if ((pool = svc_pool_find(svc, id)) == NULL) {
 727                         mutex_exit(&svc->svc_plock);
 728                         return (ENOENT);
 729                 }
 730                 /*
 731                  * Grab the transport list lock before releasing the
 732                  * pool list lock
 733                  */
 734                 rw_enter(&pool->p_lrwlock, RW_WRITER);
 735                 mutex_exit(&svc->svc_plock);
 736 
 737                 pool->p_shutdown = *((void (*)())arg);
 738 
 739                 rw_exit(&pool->p_lrwlock);
 740 
 741                 return (0);
 742         case SVCPSET_UNREGISTER_PROC:
 743                 /*
 744                  * Search the list for a pool with a matching id
 745                  * and register the unregister callback handle with that pool.
 746                  */
 747                 mutex_enter(&svc->svc_plock);
 748 
 749                 if ((pool = svc_pool_find(svc, id)) == NULL) {
 750                         mutex_exit(&svc->svc_plock);
 751                         return (ENOENT);
 752                 }
 753                 /*
 754                  * Grab the transport list lock before releasing the
 755                  * pool list lock
 756                  */
 757                 rw_enter(&pool->p_lrwlock, RW_WRITER);
 758                 mutex_exit(&svc->svc_plock);
 759 
 760                 pool->p_offline = *((void (*)())arg);
 761 
 762                 rw_exit(&pool->p_lrwlock);
 763 
 764                 return (0);
 765         default:
 766                 return (EINVAL);
 767         }
 768 }
 769 
 770 /*
 771  * Pool's transport list manipulation routines.
 772  * - svc_xprt_register()
 773  * - svc_xprt_unregister()
 774  *
 775  * svc_xprt_register() is called from svc_tli_kcreate() to
 776  * insert a new master transport handle into the doubly linked
 777  * list of server transport handles (one list per pool).
 778  *
 779  * The list is used by svc_poll(), when it operates in `drain'
 780  * mode, to search for a next transport with a pending request.
 781  */
 782 
 783 int
 784 svc_xprt_register(SVCMASTERXPRT *xprt, int id)
 785 {
 786         SVCMASTERXPRT *prev, *next;
 787         SVCPOOL *pool;
 788         struct svc_globals *svc;
 789 
 790         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
 791         /*
 792          * Search the list for a pool with a matching id
 793          * and register the transport handle with that pool.
 794          */
 795         mutex_enter(&svc->svc_plock);
 796 
 797         if ((pool = svc_pool_find(svc, id)) == NULL) {
 798                 mutex_exit(&svc->svc_plock);
 799                 return (ENOENT);
 800         }
 801 
 802         /* Grab the transport list lock before releasing the pool list lock */
 803         rw_enter(&pool->p_lrwlock, RW_WRITER);
 804         mutex_exit(&svc->svc_plock);
 805 
 806         /* Don't register new transports when the pool is in closing state */
 807         if (pool->p_closing) {
 808                 rw_exit(&pool->p_lrwlock);
 809                 return (EBUSY);
 810         }
 811 
 812         /*
 813          * Initialize xp_pool to point to the pool.
 814          * We don't want to go through the pool list every time.
 815          */
 816         xprt->xp_pool = pool;
 817 
 818         /*
 819          * Insert a transport handle into the list.
 820          * The list head points to the most recently inserted transport.
 821          */
 822         if (pool->p_lhead == NULL)
 823                 pool->p_lhead = xprt->xp_prev = xprt->xp_next = xprt;
 824         else {
 825                 next = pool->p_lhead;
 826                 prev = pool->p_lhead->xp_prev;
 827 
 828                 xprt->xp_next = next;
 829                 xprt->xp_prev = prev;
 830 
 831                 pool->p_lhead = prev->xp_next = next->xp_prev = xprt;
 832         }
 833 
 834         /* Increment the transports count */
 835         pool->p_lcount++;
 836 
 837         rw_exit(&pool->p_lrwlock);
 838         return (0);
 839 }
 840 
 841 /*
 842  * Called from svc_xprt_cleanup() to remove a master transport handle
 843  * from the pool's list of server transports (when a transport is
 844  * being destroyed).
 845  */
 846 void
 847 svc_xprt_unregister(SVCMASTERXPRT *xprt)
 848 {
 849         SVCPOOL *pool = xprt->xp_pool;
 850 
 851         /*
 852          * Unlink xprt from the list.
 853          * If the list head points to this xprt then move it
 854          * to the next xprt or reset to NULL if this is the last
 855          * xprt in the list.
 856          */
 857         rw_enter(&pool->p_lrwlock, RW_WRITER);
 858 
 859         if (xprt == xprt->xp_next)
 860                 pool->p_lhead = NULL;
 861         else {
 862                 SVCMASTERXPRT *next = xprt->xp_next;
 863                 SVCMASTERXPRT *prev = xprt->xp_prev;
 864 
 865                 next->xp_prev = prev;
 866                 prev->xp_next = next;
 867 
 868                 if (pool->p_lhead == xprt)
 869                         pool->p_lhead = next;
 870         }
 871 
 872         xprt->xp_next = xprt->xp_prev = NULL;
 873 
 874         /* Decrement list count */
 875         pool->p_lcount--;
 876 
 877         rw_exit(&pool->p_lrwlock);
 878 }
 879 
 880 static void
 881 svc_xprt_qdestroy(SVCPOOL *pool)
 882 {
 883         mutex_destroy(&pool->p_qend_lock);
 884         kmem_free(pool->p_qbody, pool->p_qsize * sizeof (__SVCXPRT_QNODE));
 885 }
 886 
 887 /*
 888  * Initialize an `xprt-ready' queue for a given pool.
 889  */
 890 static void
 891 svc_xprt_qinit(SVCPOOL *pool, size_t qsize)
 892 {
 893         int i;
 894 
 895         pool->p_qsize = qsize;
 896         pool->p_qbody = kmem_zalloc(pool->p_qsize * sizeof (__SVCXPRT_QNODE),
 897             KM_SLEEP);
 898 
 899         for (i = 0; i < pool->p_qsize - 1; i++)
 900                 pool->p_qbody[i].q_next = &(pool->p_qbody[i+1]);
 901 
 902         pool->p_qbody[pool->p_qsize-1].q_next = &(pool->p_qbody[0]);
 903         pool->p_qtop = &(pool->p_qbody[0]);
 904         pool->p_qend = &(pool->p_qbody[0]);
 905 
 906         mutex_init(&pool->p_qend_lock, NULL, MUTEX_DEFAULT, NULL);
 907 }
 908 
 909 /*
 910  * Called from the svc_queuereq() interrupt routine to queue
 911  * a hint for svc_poll() which transport has a pending request.
 912  * - insert a pointer to xprt into the xprt-ready queue (FIFO)
 913  * - if the xprt-ready queue is full turn the overflow flag on.
 914  *
 915  * NOTICE: pool->p_qtop is protected by the pool's request lock
 916  * and the caller (svc_queuereq()) must hold the lock.
 917  */
 918 static void
 919 svc_xprt_qput(SVCPOOL *pool, SVCMASTERXPRT *xprt)
 920 {
 921         ASSERT(MUTEX_HELD(&pool->p_req_lock));
 922 
 923         /* If the overflow flag is on there is nothing we can do */
 924         if (pool->p_qoverflow)
 925                 return;
 926 
 927         /* If the queue is full turn the overflow flag on and exit */
 928         if (pool->p_qtop->q_next == pool->p_qend) {
 929                 mutex_enter(&pool->p_qend_lock);
 930                 if (pool->p_qtop->q_next == pool->p_qend) {
 931                         pool->p_qoverflow = TRUE;
 932                         mutex_exit(&pool->p_qend_lock);
 933                         return;
 934                 }
 935                 mutex_exit(&pool->p_qend_lock);
 936         }
 937 
 938         /* Insert a hint and move pool->p_qtop */
 939         pool->p_qtop->q_xprt = xprt;
 940         pool->p_qtop = pool->p_qtop->q_next;
 941 }
 942 
 943 /*
 944  * Called from svc_poll() to get a hint which transport has a
 945  * pending request. Returns a pointer to a transport or NULL if the
 946  * `xprt-ready' queue is empty.
 947  *
 948  * Since we do not acquire the pool's request lock while checking if
 949  * the queue is empty we may miss a request that is just being delivered.
 950  * However this is ok since svc_poll() will retry again until the
 951  * count indicates that there are pending requests for this pool.
 952  */
 953 static SVCMASTERXPRT *
 954 svc_xprt_qget(SVCPOOL *pool)
 955 {
 956         SVCMASTERXPRT *xprt;
 957 
 958         mutex_enter(&pool->p_qend_lock);
 959         do {
 960                 /*
 961                  * If the queue is empty return NULL.
 962                  * Since we do not acquire the pool's request lock which
 963                  * protects pool->p_qtop this is not exact check. However,
 964                  * this is safe - if we miss a request here svc_poll()
 965                  * will retry again.
 966                  */
 967                 if (pool->p_qend == pool->p_qtop) {
 968                         mutex_exit(&pool->p_qend_lock);
 969                         return (NULL);
 970                 }
 971 
 972                 /* Get a hint and move pool->p_qend */
 973                 xprt = pool->p_qend->q_xprt;
 974                 pool->p_qend = pool->p_qend->q_next;
 975 
 976                 /* Skip fields deleted by svc_xprt_qdelete()     */
 977         } while (xprt == NULL);
 978         mutex_exit(&pool->p_qend_lock);
 979 
 980         return (xprt);
 981 }
 982 
 983 /*
 984  * Delete all the references to a transport handle that
 985  * is being destroyed from the xprt-ready queue.
 986  * Deleted pointers are replaced with NULLs.
 987  */
 988 static void
 989 svc_xprt_qdelete(SVCPOOL *pool, SVCMASTERXPRT *xprt)
 990 {
 991         __SVCXPRT_QNODE *q;
 992 
 993         mutex_enter(&pool->p_req_lock);
 994         for (q = pool->p_qend; q != pool->p_qtop; q = q->q_next) {
 995                 if (q->q_xprt == xprt)
 996                         q->q_xprt = NULL;
 997         }
 998         mutex_exit(&pool->p_req_lock);
 999 }
1000 
1001 /*
1002  * Destructor for a master server transport handle.
1003  * - if there are no more non-detached threads linked to this transport
1004  *   then, if requested, call xp_closeproc (we don't wait for detached
1005  *   threads linked to this transport to complete).
1006  * - if there are no more threads linked to this
1007  *   transport then
1008  *   a) remove references to this transport from the xprt-ready queue
1009  *   b) remove a reference to this transport from the pool's transport list
1010  *   c) call a transport specific `destroy' function
1011  *   d) cancel remaining thread reservations.
1012  *
1013  * NOTICE: Caller must hold the transport's thread lock.
1014  */
1015 static void
1016 svc_xprt_cleanup(SVCMASTERXPRT *xprt, bool_t detached)
1017 {
1018         ASSERT(MUTEX_HELD(&xprt->xp_thread_lock));
1019         ASSERT(xprt->xp_wq == NULL);
1020 
1021         /*
1022          * If called from the last non-detached thread
1023          * it should call the closeproc on this transport.
1024          */
1025         if (!detached && xprt->xp_threads == 0 && xprt->xp_closeproc) {
1026                 (*(xprt->xp_closeproc)) (xprt);
1027         }
1028 
1029         if (xprt->xp_threads + xprt->xp_detached_threads > 0)
1030                 mutex_exit(&xprt->xp_thread_lock);
1031         else {
1032                 /* Remove references to xprt from the `xprt-ready' queue */
1033                 svc_xprt_qdelete(xprt->xp_pool, xprt);
1034 
1035                 /* Unregister xprt from the pool's transport list */
1036                 svc_xprt_unregister(xprt);
1037                 svc_callout_free(xprt);
1038                 SVC_DESTROY(xprt);
1039         }
1040 }
1041 
1042 /*
1043  * Find a dispatch routine for a given prog/vers pair.
1044  * This function is called from svc_getreq() to search the callout
1045  * table for an entry with a matching RPC program number `prog'
1046  * and a version range that covers `vers'.
1047  * - if it finds a matching entry it returns pointer to the dispatch routine
1048  * - otherwise it returns NULL and, if `minp' or `maxp' are not NULL,
1049  *   fills them with, respectively, lowest version and highest version
1050  *   supported for the program `prog'
1051  */
1052 static SVC_DISPATCH *
1053 svc_callout_find(SVCXPRT *xprt, rpcprog_t prog, rpcvers_t vers,
1054     rpcvers_t *vers_min, rpcvers_t *vers_max)
1055 {
1056         SVC_CALLOUT_TABLE *sct = xprt->xp_sct;
1057         int i;
1058 
1059         *vers_min = ~(rpcvers_t)0;
1060         *vers_max = 0;
1061 
1062         for (i = 0; i < sct->sct_size; i++) {
1063                 SVC_CALLOUT *sc = &sct->sct_sc[i];
1064 
1065                 if (prog == sc->sc_prog) {
1066                         if (vers >= sc->sc_versmin && vers <= sc->sc_versmax)
1067                                 return (sc->sc_dispatch);
1068 
1069                         if (*vers_max < sc->sc_versmax)
1070                                 *vers_max = sc->sc_versmax;
1071                         if (*vers_min > sc->sc_versmin)
1072                                 *vers_min = sc->sc_versmin;
1073                 }
1074         }
1075 
1076         return (NULL);
1077 }
1078 
1079 /*
1080  * Optionally free callout table allocated for this transport by
1081  * the service provider.
1082  */
1083 static void
1084 svc_callout_free(SVCMASTERXPRT *xprt)
1085 {
1086         SVC_CALLOUT_TABLE *sct = xprt->xp_sct;
1087 
1088         if (sct->sct_free) {
1089                 kmem_free(sct->sct_sc, sct->sct_size * sizeof (SVC_CALLOUT));
1090                 kmem_free(sct, sizeof (SVC_CALLOUT_TABLE));
1091         }
1092 }
1093 
1094 /*
1095  * Send a reply to an RPC request
1096  *
1097  * PSARC 2003/523 Contract Private Interface
1098  * svc_sendreply
1099  * Changes must be reviewed by Solaris File Sharing
1100  * Changes must be communicated to contract-2003-523@sun.com
1101  */
1102 bool_t
1103 svc_sendreply(const SVCXPRT *clone_xprt, const xdrproc_t xdr_results,
1104     const caddr_t xdr_location)
1105 {
1106         struct rpc_msg rply;
1107 
1108         rply.rm_direction = REPLY;
1109         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1110         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1111         rply.acpted_rply.ar_stat = SUCCESS;
1112         rply.acpted_rply.ar_results.where = xdr_location;
1113         rply.acpted_rply.ar_results.proc = xdr_results;
1114 
1115         return (SVC_REPLY((SVCXPRT *)clone_xprt, &rply));
1116 }
1117 
1118 /*
1119  * No procedure error reply
1120  *
1121  * PSARC 2003/523 Contract Private Interface
1122  * svcerr_noproc
1123  * Changes must be reviewed by Solaris File Sharing
1124  * Changes must be communicated to contract-2003-523@sun.com
1125  */
1126 void
1127 svcerr_noproc(const SVCXPRT *clone_xprt)
1128 {
1129         struct rpc_msg rply;
1130 
1131         rply.rm_direction = REPLY;
1132         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1133         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1134         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
1135         SVC_FREERES((SVCXPRT *)clone_xprt);
1136         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1137 }
1138 
1139 /*
1140  * Can't decode arguments error reply
1141  *
1142  * PSARC 2003/523 Contract Private Interface
1143  * svcerr_decode
1144  * Changes must be reviewed by Solaris File Sharing
1145  * Changes must be communicated to contract-2003-523@sun.com
1146  */
1147 void
1148 svcerr_decode(const SVCXPRT *clone_xprt)
1149 {
1150         struct rpc_msg rply;
1151 
1152         rply.rm_direction = REPLY;
1153         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1154         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1155         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
1156         SVC_FREERES((SVCXPRT *)clone_xprt);
1157         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1158 }
1159 
1160 /*
1161  * Some system error
1162  */
1163 void
1164 svcerr_systemerr(const SVCXPRT *clone_xprt)
1165 {
1166         struct rpc_msg rply;
1167 
1168         rply.rm_direction = REPLY;
1169         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1170         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1171         rply.acpted_rply.ar_stat = SYSTEM_ERR;
1172         SVC_FREERES((SVCXPRT *)clone_xprt);
1173         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1174 }
1175 
1176 /*
1177  * Authentication error reply
1178  */
1179 void
1180 svcerr_auth(const SVCXPRT *clone_xprt, const enum auth_stat why)
1181 {
1182         struct rpc_msg rply;
1183 
1184         rply.rm_direction = REPLY;
1185         rply.rm_reply.rp_stat = MSG_DENIED;
1186         rply.rjcted_rply.rj_stat = AUTH_ERROR;
1187         rply.rjcted_rply.rj_why = why;
1188         SVC_FREERES((SVCXPRT *)clone_xprt);
1189         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1190 }
1191 
1192 /*
1193  * Authentication too weak error reply
1194  */
1195 void
1196 svcerr_weakauth(const SVCXPRT *clone_xprt)
1197 {
1198         svcerr_auth((SVCXPRT *)clone_xprt, AUTH_TOOWEAK);
1199 }
1200 
1201 /*
1202  * Authentication error; bad credentials
1203  */
1204 void
1205 svcerr_badcred(const SVCXPRT *clone_xprt)
1206 {
1207         struct rpc_msg rply;
1208 
1209         rply.rm_direction = REPLY;
1210         rply.rm_reply.rp_stat = MSG_DENIED;
1211         rply.rjcted_rply.rj_stat = AUTH_ERROR;
1212         rply.rjcted_rply.rj_why = AUTH_BADCRED;
1213         SVC_FREERES((SVCXPRT *)clone_xprt);
1214         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1215 }
1216 
1217 /*
1218  * Program unavailable error reply
1219  *
1220  * PSARC 2003/523 Contract Private Interface
1221  * svcerr_noprog
1222  * Changes must be reviewed by Solaris File Sharing
1223  * Changes must be communicated to contract-2003-523@sun.com
1224  */
1225 void
1226 svcerr_noprog(const SVCXPRT *clone_xprt)
1227 {
1228         struct rpc_msg rply;
1229 
1230         rply.rm_direction = REPLY;
1231         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1232         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1233         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
1234         SVC_FREERES((SVCXPRT *)clone_xprt);
1235         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1236 }
1237 
1238 /*
1239  * Program version mismatch error reply
1240  *
1241  * PSARC 2003/523 Contract Private Interface
1242  * svcerr_progvers
1243  * Changes must be reviewed by Solaris File Sharing
1244  * Changes must be communicated to contract-2003-523@sun.com
1245  */
1246 void
1247 svcerr_progvers(const SVCXPRT *clone_xprt,
1248     const rpcvers_t low_vers, const rpcvers_t high_vers)
1249 {
1250         struct rpc_msg rply;
1251 
1252         rply.rm_direction = REPLY;
1253         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1254         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1255         rply.acpted_rply.ar_stat = PROG_MISMATCH;
1256         rply.acpted_rply.ar_vers.low = low_vers;
1257         rply.acpted_rply.ar_vers.high = high_vers;
1258         SVC_FREERES((SVCXPRT *)clone_xprt);
1259         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1260 }
1261 
1262 /*
1263  * Get server side input from some transport.
1264  *
1265  * Statement of authentication parameters management:
1266  * This function owns and manages all authentication parameters, specifically
1267  * the "raw" parameters (msg.rm_call.cb_cred and msg.rm_call.cb_verf) and
1268  * the "cooked" credentials (rqst->rq_clntcred).
1269  * However, this function does not know the structure of the cooked
1270  * credentials, so it make the following assumptions:
1271  *   a) the structure is contiguous (no pointers), and
1272  *   b) the cred structure size does not exceed RQCRED_SIZE bytes.
1273  * In all events, all three parameters are freed upon exit from this routine.
1274  * The storage is trivially managed on the call stack in user land, but
1275  * is malloced in kernel land.
1276  *
1277  * Note: the xprt's xp_svc_lock is not held while the service's dispatch
1278  * routine is running.  If we decide to implement svc_unregister(), we'll
1279  * need to decide whether it's okay for a thread to unregister a service
1280  * while a request is being processed.  If we decide that this is a
1281  * problem, we can probably use some sort of reference counting scheme to
1282  * keep the callout entry from going away until the request has completed.
1283  */
1284 static void
1285 svc_getreq(
1286         SVCXPRT *clone_xprt,    /* clone transport handle */
1287         mblk_t *mp)
1288 {
1289         struct rpc_msg msg;
1290         struct svc_req r;
1291         char  *cred_area;       /* too big to allocate on call stack */
1292 
1293         TRACE_0(TR_FAC_KRPC, TR_SVC_GETREQ_START,
1294             "svc_getreq_start:");
1295 
1296         ASSERT(clone_xprt->xp_master != NULL);
1297         ASSERT(!is_system_labeled() || msg_getcred(mp, NULL) != NULL ||
1298             mp->b_datap->db_type != M_DATA);
1299 
1300         /*
1301          * Firstly, allocate the authentication parameters' storage
1302          */
1303         mutex_enter(&rqcred_lock);
1304         if (rqcred_head) {
1305                 cred_area = rqcred_head;
1306 
1307                 /* LINTED pointer alignment */
1308                 rqcred_head = *(caddr_t *)rqcred_head;
1309                 mutex_exit(&rqcred_lock);
1310         } else {
1311                 mutex_exit(&rqcred_lock);
1312                 cred_area = kmem_alloc(2 * MAX_AUTH_BYTES + RQCRED_SIZE,
1313                     KM_SLEEP);
1314         }
1315         msg.rm_call.cb_cred.oa_base = cred_area;
1316         msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
1317         r.rq_clntcred = &(cred_area[2 * MAX_AUTH_BYTES]);
1318 
1319         /*
1320          * underlying transport recv routine may modify mblk data
1321          * and make it difficult to extract label afterwards. So
1322          * get the label from the raw mblk data now.
1323          */
1324         if (is_system_labeled()) {
1325                 cred_t *cr;
1326 
1327                 r.rq_label = kmem_alloc(sizeof (bslabel_t), KM_SLEEP);
1328                 cr = msg_getcred(mp, NULL);
1329                 ASSERT(cr != NULL);
1330 
1331                 bcopy(label2bslabel(crgetlabel(cr)), r.rq_label,
1332                     sizeof (bslabel_t));
1333         } else {
1334                 r.rq_label = NULL;
1335         }
1336 
1337         /*
1338          * Now receive a message from the transport.
1339          */
1340         if (SVC_RECV(clone_xprt, mp, &msg)) {
1341                 void (*dispatchroutine) (struct svc_req *, SVCXPRT *);
1342                 rpcvers_t vers_min;
1343                 rpcvers_t vers_max;
1344                 bool_t no_dispatch;
1345                 enum auth_stat why;
1346 
1347                 /*
1348                  * Find the registered program and call its
1349                  * dispatch routine.
1350                  */
1351                 r.rq_xprt = clone_xprt;
1352                 r.rq_prog = msg.rm_call.cb_prog;
1353                 r.rq_vers = msg.rm_call.cb_vers;
1354                 r.rq_proc = msg.rm_call.cb_proc;
1355                 r.rq_cred = msg.rm_call.cb_cred;
1356 
1357                 /*
1358                  * First authenticate the message.
1359                  */
1360                 TRACE_0(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_START,
1361                     "svc_getreq_auth_start:");
1362                 if ((why = sec_svc_msg(&r, &msg, &no_dispatch)) != AUTH_OK) {
1363                         TRACE_1(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_END,
1364                             "svc_getreq_auth_end:(%S)", "failed");
1365                         svcerr_auth(clone_xprt, why);
1366                         /*
1367                          * Free the arguments.
1368                          */
1369                         (void) SVC_FREEARGS(clone_xprt, NULL, NULL);
1370                 } else if (no_dispatch) {
1371                         /*
1372                          * XXX - when bug id 4053736 is done, remove
1373                          * the SVC_FREEARGS() call.
1374                          */
1375                         (void) SVC_FREEARGS(clone_xprt, NULL, NULL);
1376                 } else {
1377                         TRACE_1(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_END,
1378                             "svc_getreq_auth_end:(%S)", "good");
1379 
1380                         dispatchroutine = svc_callout_find(clone_xprt,
1381                             r.rq_prog, r.rq_vers, &vers_min, &vers_max);
1382 
1383                         if (dispatchroutine) {
1384                                 (*dispatchroutine) (&r, clone_xprt);
1385                         } else {
1386                                 /*
1387                                  * If we got here, the program or version
1388                                  * is not served ...
1389                                  */
1390                                 if (vers_max == 0 ||
1391                                     version_keepquiet(clone_xprt))
1392                                         svcerr_noprog(clone_xprt);
1393                                 else
1394                                         svcerr_progvers(clone_xprt, vers_min,
1395                                             vers_max);
1396 
1397                                 /*
1398                                  * Free the arguments. For successful calls
1399                                  * this is done by the dispatch routine.
1400                                  */
1401                                 (void) SVC_FREEARGS(clone_xprt, NULL, NULL);
1402                                 /* Fall through to ... */
1403                         }
1404                         /*
1405                          * Call cleanup procedure for RPCSEC_GSS.
1406                          * This is a hack since there is currently no
1407                          * op, such as SVC_CLEANAUTH. rpc_gss_cleanup
1408                          * should only be called for a non null proc.
1409                          * Null procs in RPC GSS are overloaded to
1410                          * provide context setup and control. The main
1411                          * purpose of rpc_gss_cleanup is to decrement the
1412                          * reference count associated with the cached
1413                          * GSS security context. We should never get here
1414                          * for an RPCSEC_GSS null proc since *no_dispatch
1415                          * would have been set to true from sec_svc_msg above.
1416                          */
1417                         if (r.rq_cred.oa_flavor == RPCSEC_GSS)
1418                                 rpc_gss_cleanup(clone_xprt);
1419                 }
1420         }
1421 
1422         if (r.rq_label != NULL)
1423                 kmem_free(r.rq_label, sizeof (bslabel_t));
1424 
1425         /*
1426          * Free authentication parameters' storage
1427          */
1428         mutex_enter(&rqcred_lock);
1429         /* LINTED pointer alignment */
1430         *(caddr_t *)cred_area = rqcred_head;
1431         rqcred_head = cred_area;
1432         mutex_exit(&rqcred_lock);
1433 }
1434 
1435 /*
1436  * Allocate new clone transport handle.
1437  */
1438 SVCXPRT *
1439 svc_clone_init(void)
1440 {
1441         SVCXPRT *clone_xprt;
1442 
1443         clone_xprt = kmem_zalloc(sizeof (SVCXPRT), KM_SLEEP);
1444         clone_xprt->xp_cred = crget();
1445         return (clone_xprt);
1446 }
1447 
1448 /*
1449  * Free memory allocated by svc_clone_init.
1450  */
1451 void
1452 svc_clone_free(SVCXPRT *clone_xprt)
1453 {
1454         /* Fre credentials from crget() */
1455         if (clone_xprt->xp_cred)
1456                 crfree(clone_xprt->xp_cred);
1457         kmem_free(clone_xprt, sizeof (SVCXPRT));
1458 }
1459 
1460 /*
1461  * Link a per-thread clone transport handle to a master
1462  * - increment a thread reference count on the master
1463  * - copy some of the master's fields to the clone
1464  * - call a transport specific clone routine.
1465  */
1466 void
1467 svc_clone_link(SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt, SVCXPRT *clone_xprt2)
1468 {
1469         cred_t *cred = clone_xprt->xp_cred;
1470 
1471         ASSERT(cred);
1472 
1473         /*
1474          * Bump up master's thread count.
1475          * Linking a per-thread clone transport handle to a master
1476          * associates a service thread with the master.
1477          */
1478         mutex_enter(&xprt->xp_thread_lock);
1479         xprt->xp_threads++;
1480         mutex_exit(&xprt->xp_thread_lock);
1481 
1482         /* Clear everything */
1483         bzero(clone_xprt, sizeof (SVCXPRT));
1484 
1485         /* Set pointer to the master transport stucture */
1486         clone_xprt->xp_master = xprt;
1487 
1488         /* Structure copy of all the common fields */
1489         clone_xprt->xp_xpc = xprt->xp_xpc;
1490 
1491         /* Restore per-thread fields (xp_cred) */
1492         clone_xprt->xp_cred = cred;
1493 
1494         if (clone_xprt2)
1495                 SVC_CLONE_XPRT(clone_xprt2, clone_xprt);
1496 }
1497 
1498 /*
1499  * Unlink a non-detached clone transport handle from a master
1500  * - decrement a thread reference count on the master
1501  * - if the transport is closing (xp_wq is NULL) call svc_xprt_cleanup();
1502  *   if this is the last non-detached/absolute thread on this transport
1503  *   then it will close/destroy the transport
1504  * - call transport specific function to destroy the clone handle
1505  * - clear xp_master to avoid recursion.
1506  */
1507 void
1508 svc_clone_unlink(SVCXPRT *clone_xprt)
1509 {
1510         SVCMASTERXPRT *xprt = clone_xprt->xp_master;
1511 
1512         /* This cannot be a detached thread */
1513         ASSERT(!clone_xprt->xp_detached);
1514         ASSERT(xprt->xp_threads > 0);
1515 
1516         /* Decrement a reference count on the transport */
1517         mutex_enter(&xprt->xp_thread_lock);
1518         xprt->xp_threads--;
1519 
1520         /* svc_xprt_cleanup() unlocks xp_thread_lock or destroys xprt */
1521         if (xprt->xp_wq)
1522                 mutex_exit(&xprt->xp_thread_lock);
1523         else
1524                 svc_xprt_cleanup(xprt, FALSE);
1525 
1526         /* Call a transport specific clone `destroy' function */
1527         SVC_CLONE_DESTROY(clone_xprt);
1528 
1529         /* Clear xp_master */
1530         clone_xprt->xp_master = NULL;
1531 }
1532 
1533 /*
1534  * Unlink a detached clone transport handle from a master
1535  * - decrement the thread count on the master
1536  * - if the transport is closing (xp_wq is NULL) call svc_xprt_cleanup();
1537  *   if this is the last thread on this transport then it will destroy
1538  *   the transport.
1539  * - call a transport specific function to destroy the clone handle
1540  * - clear xp_master to avoid recursion.
1541  */
1542 static void
1543 svc_clone_unlinkdetached(SVCXPRT *clone_xprt)
1544 {
1545         SVCMASTERXPRT *xprt = clone_xprt->xp_master;
1546 
1547         /* This must be a detached thread */
1548         ASSERT(clone_xprt->xp_detached);
1549         ASSERT(xprt->xp_detached_threads > 0);
1550         ASSERT(xprt->xp_threads + xprt->xp_detached_threads > 0);
1551 
1552         /* Grab xprt->xp_thread_lock and decrement link counts */
1553         mutex_enter(&xprt->xp_thread_lock);
1554         xprt->xp_detached_threads--;
1555 
1556         /* svc_xprt_cleanup() unlocks xp_thread_lock or destroys xprt */
1557         if (xprt->xp_wq)
1558                 mutex_exit(&xprt->xp_thread_lock);
1559         else
1560                 svc_xprt_cleanup(xprt, TRUE);
1561 
1562         /* Call transport specific clone `destroy' function */
1563         SVC_CLONE_DESTROY(clone_xprt);
1564 
1565         /* Clear xp_master */
1566         clone_xprt->xp_master = NULL;
1567 }
1568 
1569 /*
1570  * Try to exit a non-detached service thread
1571  * - check if there are enough threads left
1572  * - if this thread (ie its clone transport handle) are linked
1573  *   to a master transport then unlink it
1574  * - free the clone structure
1575  * - return to userland for thread exit
1576  *
1577  * If this is the last non-detached or the last thread on this
1578  * transport then the call to svc_clone_unlink() will, respectively,
1579  * close and/or destroy the transport.
1580  */
1581 static void
1582 svc_thread_exit(SVCPOOL *pool, SVCXPRT *clone_xprt)
1583 {
1584         if (clone_xprt->xp_master)
1585                 svc_clone_unlink(clone_xprt);
1586         svc_clone_free(clone_xprt);
1587 
1588         mutex_enter(&pool->p_thread_lock);
1589         pool->p_threads--;
1590         if (pool->p_closing && svc_pool_tryexit(pool))
1591                 /* return -  thread exit will be handled at user level */
1592                 return;
1593         mutex_exit(&pool->p_thread_lock);
1594 
1595         /* return -  thread exit will be handled at user level */
1596 }
1597 
1598 /*
1599  * Exit a detached service thread that returned to svc_run
1600  * - decrement the `detached thread' count for the pool
1601  * - unlink the detached clone transport handle from the master
1602  * - free the clone structure
1603  * - return to userland for thread exit
1604  *
1605  * If this is the last thread on this transport then the call
1606  * to svc_clone_unlinkdetached() will destroy the transport.
1607  */
1608 static void
1609 svc_thread_exitdetached(SVCPOOL *pool, SVCXPRT *clone_xprt)
1610 {
1611         /* This must be a detached thread */
1612         ASSERT(clone_xprt->xp_master);
1613         ASSERT(clone_xprt->xp_detached);
1614         ASSERT(!MUTEX_HELD(&pool->p_thread_lock));
1615 
1616         svc_clone_unlinkdetached(clone_xprt);
1617         svc_clone_free(clone_xprt);
1618 
1619         mutex_enter(&pool->p_thread_lock);
1620 
1621         ASSERT(pool->p_reserved_threads >= 0);
1622         ASSERT(pool->p_detached_threads > 0);
1623 
1624         pool->p_detached_threads--;
1625         if (pool->p_closing && svc_pool_tryexit(pool))
1626                 /* return -  thread exit will be handled at user level */
1627                 return;
1628         mutex_exit(&pool->p_thread_lock);
1629 
1630         /* return -  thread exit will be handled at user level */
1631 }
1632 
1633 /*
1634  * PSARC 2003/523 Contract Private Interface
1635  * svc_wait
1636  * Changes must be reviewed by Solaris File Sharing
1637  * Changes must be communicated to contract-2003-523@sun.com
1638  */
1639 int
1640 svc_wait(int id)
1641 {
1642         SVCPOOL *pool;
1643         int     err = 0;
1644         struct svc_globals *svc;
1645 
1646         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
1647         mutex_enter(&svc->svc_plock);
1648         pool = svc_pool_find(svc, id);
1649         mutex_exit(&svc->svc_plock);
1650 
1651         if (pool == NULL)
1652                 return (ENOENT);
1653 
1654         mutex_enter(&pool->p_user_lock);
1655 
1656         /* Check if there's already a user thread waiting on this pool */
1657         if (pool->p_user_waiting) {
1658                 mutex_exit(&pool->p_user_lock);
1659                 return (EBUSY);
1660         }
1661 
1662         pool->p_user_waiting = TRUE;
1663 
1664         /* Go to sleep, waiting for the signaled flag. */
1665         while (!pool->p_signal_create_thread && !pool->p_user_exit) {
1666                 if (cv_wait_sig(&pool->p_user_cv, &pool->p_user_lock) == 0) {
1667                         /* Interrupted, return to handle exit or signal */
1668                         pool->p_user_waiting = FALSE;
1669                         pool->p_signal_create_thread = FALSE;
1670                         mutex_exit(&pool->p_user_lock);
1671 
1672                         /*
1673                          * Thread has been interrupted and therefore
1674                          * the service daemon is leaving as well so
1675                          * let's go ahead and remove the service
1676                          * pool at this time.
1677                          */
1678                         mutex_enter(&svc->svc_plock);
1679                         svc_pool_unregister(svc, pool);
1680                         mutex_exit(&svc->svc_plock);
1681 
1682                         return (EINTR);
1683                 }
1684         }
1685 
1686         pool->p_signal_create_thread = FALSE;
1687         pool->p_user_waiting = FALSE;
1688 
1689         /*
1690          * About to exit the service pool. Set return value
1691          * to let the userland code know our intent. Signal
1692          * svc_thread_creator() so that it can clean up the
1693          * pool structure.
1694          */
1695         if (pool->p_user_exit) {
1696                 err = ECANCELED;
1697                 cv_signal(&pool->p_user_cv);
1698         }
1699 
1700         mutex_exit(&pool->p_user_lock);
1701 
1702         /* Return to userland with error code, for possible thread creation. */
1703         return (err);
1704 }
1705 
1706 /*
1707  * `Service threads' creator thread.
1708  * The creator thread waits for a signal to create new thread.
1709  */
1710 static void
1711 svc_thread_creator(SVCPOOL *pool)
1712 {
1713         callb_cpr_t cpr_info;   /* CPR info for the creator thread */
1714 
1715         CALLB_CPR_INIT(&cpr_info, &pool->p_creator_lock, callb_generic_cpr,
1716             "svc_thread_creator");
1717 
1718         for (;;) {
1719                 mutex_enter(&pool->p_creator_lock);
1720 
1721                 /* Check if someone set the exit flag */
1722                 if (pool->p_creator_exit)
1723                         break;
1724 
1725                 /* Clear the `signaled' flag and go asleep */
1726                 pool->p_creator_signaled = FALSE;
1727 
1728                 CALLB_CPR_SAFE_BEGIN(&cpr_info);
1729                 cv_wait(&pool->p_creator_cv, &pool->p_creator_lock);
1730                 CALLB_CPR_SAFE_END(&cpr_info, &pool->p_creator_lock);
1731 
1732                 /* Check if someone signaled to exit */
1733                 if (pool->p_creator_exit)
1734                         break;
1735 
1736                 mutex_exit(&pool->p_creator_lock);
1737 
1738                 mutex_enter(&pool->p_thread_lock);
1739 
1740                 /*
1741                  * When the pool is in closing state and all the transports
1742                  * are gone the creator should not create any new threads.
1743                  */
1744                 if (pool->p_closing) {
1745                         rw_enter(&pool->p_lrwlock, RW_READER);
1746                         if (pool->p_lcount == 0) {
1747                                 rw_exit(&pool->p_lrwlock);
1748                                 mutex_exit(&pool->p_thread_lock);
1749                                 continue;
1750                         }
1751                         rw_exit(&pool->p_lrwlock);
1752                 }
1753 
1754                 /*
1755                  * Create a new service thread now.
1756                  */
1757                 ASSERT(pool->p_reserved_threads >= 0);
1758                 ASSERT(pool->p_detached_threads >= 0);
1759 
1760                 if (pool->p_threads + pool->p_detached_threads <
1761                     pool->p_maxthreads) {
1762                         /*
1763                          * Signal the service pool wait thread
1764                          * only if it hasn't already been signaled.
1765                          */
1766                         mutex_enter(&pool->p_user_lock);
1767                         if (pool->p_signal_create_thread == FALSE) {
1768                                 pool->p_signal_create_thread = TRUE;
1769                                 cv_signal(&pool->p_user_cv);
1770                         }
1771                         mutex_exit(&pool->p_user_lock);
1772 
1773                 }
1774 
1775                 mutex_exit(&pool->p_thread_lock);
1776         }
1777 
1778         /*
1779          * Pool is closed. Cleanup and exit.
1780          */
1781 
1782         /* Signal userland creator thread that it can stop now. */
1783         mutex_enter(&pool->p_user_lock);
1784         pool->p_user_exit = TRUE;
1785         cv_broadcast(&pool->p_user_cv);
1786         mutex_exit(&pool->p_user_lock);
1787 
1788         /* Wait for svc_wait() to be done with the pool */
1789         mutex_enter(&pool->p_user_lock);
1790         while (pool->p_user_waiting) {
1791                 CALLB_CPR_SAFE_BEGIN(&cpr_info);
1792                 cv_wait(&pool->p_user_cv, &pool->p_user_lock);
1793                 CALLB_CPR_SAFE_END(&cpr_info, &pool->p_creator_lock);
1794         }
1795         mutex_exit(&pool->p_user_lock);
1796 
1797         CALLB_CPR_EXIT(&cpr_info);
1798         svc_pool_cleanup(pool);
1799         zthread_exit();
1800 }
1801 
1802 /*
1803  * If the creator thread  is idle signal it to create
1804  * a new service thread.
1805  */
1806 static void
1807 svc_creator_signal(SVCPOOL *pool)
1808 {
1809         mutex_enter(&pool->p_creator_lock);
1810         if (pool->p_creator_signaled == FALSE) {
1811                 pool->p_creator_signaled = TRUE;
1812                 cv_signal(&pool->p_creator_cv);
1813         }
1814         mutex_exit(&pool->p_creator_lock);
1815 }
1816 
1817 /*
1818  * Notify the creator thread to clean up and exit.
1819  */
1820 static void
1821 svc_creator_signalexit(SVCPOOL *pool)
1822 {
1823         mutex_enter(&pool->p_creator_lock);
1824         pool->p_creator_exit = TRUE;
1825         cv_signal(&pool->p_creator_cv);
1826         mutex_exit(&pool->p_creator_lock);
1827 }
1828 
1829 /*
1830  * Polling part of the svc_run().
1831  * - search for a transport with a pending request
1832  * - when one is found then latch the request lock and return to svc_run()
1833  * - if there is no request go asleep and wait for a signal
1834  * - handle two exceptions:
1835  *   a) current transport is closing
1836  *   b) timeout waiting for a new request
1837  *   in both cases return to svc_run()
1838  */
1839 static SVCMASTERXPRT *
1840 svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
1841 {
1842         /*
1843          * Main loop iterates until
1844          * a) we find a pending request,
1845          * b) detect that the current transport is closing
1846          * c) time out waiting for a new request.
1847          */
1848         for (;;) {
1849                 SVCMASTERXPRT *next;
1850                 clock_t timeleft;
1851 
1852                 /*
1853                  * Step 1.
1854                  * Check if there is a pending request on the current
1855                  * transport handle so that we can avoid cloning.
1856                  * If so then decrement the `pending-request' count for
1857                  * the pool and return to svc_run().
1858                  *
1859                  * We need to prevent a potential starvation. When
1860                  * a selected transport has all pending requests coming in
1861                  * all the time then the service threads will never switch to
1862                  * another transport. With a limited number of service
1863                  * threads some transports may be never serviced.
1864                  * To prevent such a scenario we pick up at most
1865                  * pool->p_max_same_xprt requests from the same transport
1866                  * and then take a hint from the xprt-ready queue or walk
1867                  * the transport list.
1868                  */
1869                 if (xprt && xprt->xp_req_head && (!pool->p_qoverflow ||
1870                     clone_xprt->xp_same_xprt++ < pool->p_max_same_xprt)) {
1871                         mutex_enter(&xprt->xp_req_lock);
1872                         if (xprt->xp_req_head)
1873                                 return (xprt);
1874                         mutex_exit(&xprt->xp_req_lock);
1875                 }
1876                 clone_xprt->xp_same_xprt = 0;
1877 
1878                 /*
1879                  * Step 2.
1880                  * If there is no request on the current transport try to
1881                  * find another transport with a pending request.
1882                  */
1883                 mutex_enter(&pool->p_req_lock);
1884                 pool->p_walkers++;
1885                 mutex_exit(&pool->p_req_lock);
1886 
1887                 /*
1888                  * Make sure that transports will not be destroyed just
1889                  * while we are checking them.
1890                  */
1891                 rw_enter(&pool->p_lrwlock, RW_READER);
1892 
1893                 for (;;) {
1894                         SVCMASTERXPRT *hint;
1895 
1896                         /*
1897                          * Get the next transport from the xprt-ready queue.
1898                          * This is a hint. There is no guarantee that the
1899                          * transport still has a pending request since it
1900                          * could be picked up by another thread in step 1.
1901                          *
1902                          * If the transport has a pending request then keep
1903                          * it locked. Decrement the `pending-requests' for
1904                          * the pool and `walking-threads' counts, and return
1905                          * to svc_run().
1906                          */
1907                         hint = svc_xprt_qget(pool);
1908 
1909                         if (hint && hint->xp_req_head) {
1910                                 mutex_enter(&hint->xp_req_lock);
1911                                 if (hint->xp_req_head) {
1912                                         rw_exit(&pool->p_lrwlock);
1913 
1914                                         mutex_enter(&pool->p_req_lock);
1915                                         pool->p_walkers--;
1916                                         mutex_exit(&pool->p_req_lock);
1917 
1918                                         return (hint);
1919                                 }
1920                                 mutex_exit(&hint->xp_req_lock);
1921                         }
1922 
1923                         /*
1924                          * If there was no hint in the xprt-ready queue then
1925                          * - if there is less pending requests than polling
1926                          *   threads go asleep
1927                          * - otherwise check if there was an overflow in the
1928                          *   xprt-ready queue; if so, then we need to break
1929                          *   the `drain' mode
1930                          */
1931                         if (hint == NULL) {
1932                                 if (pool->p_reqs < pool->p_walkers) {
1933                                         mutex_enter(&pool->p_req_lock);
1934                                         if (pool->p_reqs < pool->p_walkers)
1935                                                 goto sleep;
1936                                         mutex_exit(&pool->p_req_lock);
1937                                 }
1938                                 if (pool->p_qoverflow) {
1939                                         break;
1940                                 }
1941                         }
1942                 }
1943 
1944                 /*
1945                  * If there was an overflow in the xprt-ready queue then we
1946                  * need to switch to the `drain' mode, i.e. walk through the
1947                  * pool's transport list and search for a transport with a
1948                  * pending request. If we manage to drain all the pending
1949                  * requests then we can clear the overflow flag. This will
1950                  * switch svc_poll() back to taking hints from the xprt-ready
1951                  * queue (which is generally more efficient).
1952                  *
1953                  * If there are no registered transports simply go asleep.
1954                  */
1955                 if (xprt == NULL && pool->p_lhead == NULL) {
1956                         mutex_enter(&pool->p_req_lock);
1957                         goto sleep;
1958                 }
1959 
1960                 /*
1961                  * `Walk' through the pool's list of master server
1962                  * transport handles. Continue to loop until there are less
1963                  * looping threads then pending requests.
1964                  */
1965                 next = xprt ? xprt->xp_next : pool->p_lhead;
1966 
1967                 for (;;) {
1968                         /*
1969                          * Check if there is a request on this transport.
1970                          *
1971                          * Since blocking on a locked mutex is very expensive
1972                          * check for a request without a lock first. If we miss
1973                          * a request that is just being delivered but this will
1974                          * cost at most one full walk through the list.
1975                          */
1976                         if (next->xp_req_head) {
1977                                 /*
1978                                  * Check again, now with a lock.
1979                                  */
1980                                 mutex_enter(&next->xp_req_lock);
1981                                 if (next->xp_req_head) {
1982                                         rw_exit(&pool->p_lrwlock);
1983 
1984                                         mutex_enter(&pool->p_req_lock);
1985                                         pool->p_walkers--;
1986                                         mutex_exit(&pool->p_req_lock);
1987 
1988                                         return (next);
1989                                 }
1990                                 mutex_exit(&next->xp_req_lock);
1991                         }
1992 
1993                         /*
1994                          * Continue to `walk' through the pool's
1995                          * transport list until there is less requests
1996                          * than walkers. Check this condition without
1997                          * a lock first to avoid contention on a mutex.
1998                          */
1999                         if (pool->p_reqs < pool->p_walkers) {
2000                                 /* Check again, now with the lock. */
2001                                 mutex_enter(&pool->p_req_lock);
2002                                 if (pool->p_reqs < pool->p_walkers)
2003                                         break;  /* goto sleep */
2004                                 mutex_exit(&pool->p_req_lock);
2005                         }
2006 
2007                         next = next->xp_next;
2008                 }
2009 
2010         sleep:
2011                 /*
2012                  * No work to do. Stop the `walk' and go asleep.
2013                  * Decrement the `walking-threads' count for the pool.
2014                  */
2015                 pool->p_walkers--;
2016                 rw_exit(&pool->p_lrwlock);
2017 
2018                 /*
2019                  * Count us as asleep, mark this thread as safe
2020                  * for suspend and wait for a request.
2021                  */
2022                 pool->p_asleep++;
2023                 timeleft = cv_reltimedwait_sig(&pool->p_req_cv,
2024                     &pool->p_req_lock, pool->p_timeout, TR_CLOCK_TICK);
2025 
2026                 /*
2027                  * If the drowsy flag is on this means that
2028                  * someone has signaled a wakeup. In such a case
2029                  * the `asleep-threads' count has already updated
2030                  * so just clear the flag.
2031                  *
2032                  * If the drowsy flag is off then we need to update
2033                  * the `asleep-threads' count.
2034                  */
2035                 if (pool->p_drowsy) {
2036                         pool->p_drowsy = FALSE;
2037                         /*
2038                          * If the thread is here because it timedout,
2039                          * instead of returning SVC_ETIMEDOUT, it is
2040                          * time to do some more work.
2041                          */
2042                         if (timeleft == -1)
2043                                 timeleft = 1;
2044                 } else {
2045                         pool->p_asleep--;
2046                 }
2047                 mutex_exit(&pool->p_req_lock);
2048 
2049                 /*
2050                  * If we received a signal while waiting for a
2051                  * request, inform svc_run(), so that we can return
2052                  * to user level and exit.
2053                  */
2054                 if (timeleft == 0)
2055                         return (SVC_EINTR);
2056 
2057                 /*
2058                  * If the current transport is gone then notify
2059                  * svc_run() to unlink from it.
2060                  */
2061                 if (xprt && xprt->xp_wq == NULL)
2062                         return (SVC_EXPRTGONE);
2063 
2064                 /*
2065                  * If we have timed out waiting for a request inform
2066                  * svc_run() that we probably don't need this thread.
2067                  */
2068                 if (timeleft == -1)
2069                         return (SVC_ETIMEDOUT);
2070         }
2071 }
2072 
2073 /*
2074  * calculate memory space used by message
2075  */
2076 static size_t
2077 svc_msgsize(mblk_t *mp)
2078 {
2079         size_t count = 0;
2080 
2081         for (; mp; mp = mp->b_cont)
2082                 count += MBLKSIZE(mp);
2083 
2084         return (count);
2085 }
2086 
2087 /*
2088  * svc_flowcontrol() attempts to turn the flow control on or off for the
2089  * transport.
2090  *
2091  * On input the xprt->xp_full determines whether the flow control is currently
2092  * off (FALSE) or on (TRUE).  If it is off we do tests to see whether we should
2093  * turn it on, and vice versa.
2094  *
2095  * There are two conditions considered for the flow control.  Both conditions
2096  * have the low and the high watermark.  Once the high watermark is reached in
2097  * EITHER condition the flow control is turned on.  For turning the flow
2098  * control off BOTH conditions must be below the low watermark.
2099  *
2100  * Condition #1 - Number of requests queued:
2101  *
2102  * The max number of threads working on the pool is roughly pool->p_maxthreads.
2103  * Every thread could handle up to pool->p_max_same_xprt requests from one
2104  * transport before it moves to another transport.  See svc_poll() for details.
2105  * In case all threads in the pool are working on a transport they will handle
2106  * no more than enough_reqs (pool->p_maxthreads * pool->p_max_same_xprt)
2107  * requests in one shot from that transport.  We are turning the flow control
2108  * on once the high watermark is reached for a transport so that the underlying
2109  * queue knows the rate of incoming requests is higher than we are able to
2110  * handle.
2111  *
2112  * The high watermark: 2 * enough_reqs
2113  * The low watermark: enough_reqs
2114  *
2115  * Condition #2 - Length of the data payload for the queued messages/requests:
2116  *
2117  * We want to prevent a particular pool exhausting the memory, so once the
2118  * total length of queued requests for the whole pool reaches the high
2119  * watermark we start to turn on the flow control for significant memory
2120  * consumers (individual transports).  To keep the implementation simple
2121  * enough, this condition is not exact, because we count only the data part of
2122  * the queued requests and we ignore the overhead.  For our purposes this
2123  * should be enough.  We should also consider that up to pool->p_maxthreads
2124  * threads for the pool might work on large requests (this is not counted for
2125  * this condition).  We need to leave some space for rest of the system and for
2126  * other big memory consumers (like ZFS).  Also, after the flow control is
2127  * turned on (on cots transports) we can start to accumulate a few megabytes in
2128  * queues for each transport.
2129  *
2130  * Usually, the big memory consumers are NFS WRITE requests, so we do not
2131  * expect to see this condition met for other than NFS pools.
2132  *
2133  * The high watermark: 1/5 of available memory
2134  * The low watermark: 1/6 of available memory
2135  *
2136  * Once the high watermark is reached we turn the flow control on only for
2137  * transports exceeding a per-transport memory limit.  The per-transport
2138  * fraction of memory is calculated as:
2139  *
2140  * the high watermark / number of transports
2141  *
2142  * For transports with less than the per-transport fraction of memory consumed,
2143  * the flow control is not turned on, so they are not blocked by a few "hungry"
2144  * transports.  Because of this, the total memory consumption for the
2145  * particular pool might grow up to 2 * the high watermark.
2146  *
2147  * The individual transports are unblocked once their consumption is below:
2148  *
2149  * per-transport fraction of memory / 2
2150  *
2151  * or once the total memory consumption for the whole pool falls below the low
2152  * watermark.
2153  *
2154  */
2155 static void
2156 svc_flowcontrol(SVCMASTERXPRT *xprt)
2157 {
2158         SVCPOOL *pool = xprt->xp_pool;
2159         size_t totalmem = ptob(physmem);
2160         int enough_reqs = pool->p_maxthreads * pool->p_max_same_xprt;
2161 
2162         ASSERT(MUTEX_HELD(&xprt->xp_req_lock));
2163 
2164         /* Should we turn the flow control on? */
2165         if (xprt->xp_full == FALSE) {
2166                 /* Is flow control disabled? */
2167                 if (svc_flowcontrol_disable != 0)
2168                         return;
2169 
2170                 /* Is there enough requests queued? */
2171                 if (xprt->xp_reqs >= enough_reqs * 2) {
2172                         xprt->xp_full = TRUE;
2173                         return;
2174                 }
2175 
2176                 /*
2177                  * If this pool uses over 20% of memory and this transport is
2178                  * significant memory consumer then we are full
2179                  */
2180                 if (pool->p_size >= totalmem / 5 &&
2181                     xprt->xp_size >= totalmem / 5 / pool->p_lcount)
2182                         xprt->xp_full = TRUE;
2183 
2184                 return;
2185         }
2186 
2187         /* We might want to turn the flow control off */
2188 
2189         /* Do we still have enough requests? */
2190         if (xprt->xp_reqs > enough_reqs)
2191                 return;
2192 
2193         /*
2194          * If this pool still uses over 16% of memory and this transport is
2195          * still significant memory consumer then we are still full
2196          */
2197         if (pool->p_size >= totalmem / 6 &&
2198             xprt->xp_size >= totalmem / 5 / pool->p_lcount / 2)
2199                 return;
2200 
2201         /* Turn the flow control off and make sure rpcmod is notified */
2202         xprt->xp_full = FALSE;
2203         xprt->xp_enable = TRUE;
2204 }
2205 
2206 /*
2207  * Main loop of the kernel RPC server
2208  * - wait for input (find a transport with a pending request).
2209  * - dequeue the request
2210  * - call a registered server routine to process the requests
2211  *
2212  * There can many threads running concurrently in this loop
2213  * on the same or on different transports.
2214  */
2215 static int
2216 svc_run(SVCPOOL *pool)
2217 {
2218         SVCMASTERXPRT *xprt = NULL;     /* master transport handle  */
2219         SVCXPRT *clone_xprt;    /* clone for this thread    */
2220         proc_t *p = ttoproc(curthread);
2221 
2222         /* Allocate a clone transport handle for this thread */
2223         clone_xprt = svc_clone_init();
2224 
2225         /*
2226          * The loop iterates until the thread becomes
2227          * idle too long or the transport is gone.
2228          */
2229         for (;;) {
2230                 SVCMASTERXPRT *next;
2231                 mblk_t *mp;
2232                 bool_t enable;
2233                 size_t size;
2234 
2235                 TRACE_0(TR_FAC_KRPC, TR_SVC_RUN, "svc_run");
2236 
2237                 /*
2238                  * If the process is exiting/killed, return
2239                  * immediately without processing any more
2240                  * requests.
2241                  */
2242                 if (p->p_flag & (SEXITING | SKILLED)) {
2243                         svc_thread_exit(pool, clone_xprt);
2244                         return (EINTR);
2245                 }
2246 
2247                 /* Find a transport with a pending request */
2248                 next = svc_poll(pool, xprt, clone_xprt);
2249 
2250                 /*
2251                  * If svc_poll() finds a transport with a request
2252                  * it latches xp_req_lock on it. Therefore we need
2253                  * to dequeue the request and release the lock as
2254                  * soon as possible.
2255                  */
2256                 ASSERT(next != NULL &&
2257                     (next == SVC_EXPRTGONE ||
2258                     next == SVC_ETIMEDOUT ||
2259                     next == SVC_EINTR ||
2260                     MUTEX_HELD(&next->xp_req_lock)));
2261 
2262                 /* Ooops! Current transport is closing. Unlink now */
2263                 if (next == SVC_EXPRTGONE) {
2264                         svc_clone_unlink(clone_xprt);
2265                         xprt = NULL;
2266                         continue;
2267                 }
2268 
2269                 /* Ooops! Timeout while waiting for a request. Exit */
2270                 if (next == SVC_ETIMEDOUT) {
2271                         svc_thread_exit(pool, clone_xprt);
2272                         return (0);
2273                 }
2274 
2275                 /*
2276                  * Interrupted by a signal while waiting for a
2277                  * request. Return to userspace and exit.
2278                  */
2279                 if (next == SVC_EINTR) {
2280                         svc_thread_exit(pool, clone_xprt);
2281                         return (EINTR);
2282                 }
2283 
2284                 /*
2285                  * De-queue the request and release the request lock
2286                  * on this transport (latched by svc_poll()).
2287                  */
2288                 mp = next->xp_req_head;
2289                 next->xp_req_head = mp->b_next;
2290                 mp->b_next = (mblk_t *)0;
2291                 size = svc_msgsize(mp);
2292 
2293                 mutex_enter(&pool->p_req_lock);
2294                 pool->p_reqs--;
2295                 if (pool->p_reqs == 0)
2296                         pool->p_qoverflow = FALSE;
2297                 pool->p_size -= size;
2298                 mutex_exit(&pool->p_req_lock);
2299 
2300                 next->xp_reqs--;
2301                 next->xp_size -= size;
2302 
2303                 if (next->xp_full)
2304                         svc_flowcontrol(next);
2305 
2306                 TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_DEQ,
2307                     "rpc_que_req_deq:pool %p mp %p", pool, mp);
2308                 mutex_exit(&next->xp_req_lock);
2309 
2310                 /*
2311                  * If this is a new request on a current transport then
2312                  * the clone structure is already properly initialized.
2313                  * Otherwise, if the request is on a different transport,
2314                  * unlink from the current master and link to
2315                  * the one we got a request on.
2316                  */
2317                 if (next != xprt) {
2318                         if (xprt)
2319                                 svc_clone_unlink(clone_xprt);
2320                         svc_clone_link(next, clone_xprt, NULL);
2321                         xprt = next;
2322                 }
2323 
2324                 /*
2325                  * If there are more requests and req_cv hasn't
2326                  * been signaled yet then wake up one more thread now.
2327                  *
2328                  * We avoid signaling req_cv until the most recently
2329                  * signaled thread wakes up and gets CPU to clear
2330                  * the `drowsy' flag.
2331                  */
2332                 if (!(pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
2333                     pool->p_asleep == 0)) {
2334                         mutex_enter(&pool->p_req_lock);
2335 
2336                         if (pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
2337                             pool->p_asleep == 0)
2338                                 mutex_exit(&pool->p_req_lock);
2339                         else {
2340                                 pool->p_asleep--;
2341                                 pool->p_drowsy = TRUE;
2342 
2343                                 cv_signal(&pool->p_req_cv);
2344                                 mutex_exit(&pool->p_req_lock);
2345                         }
2346                 }
2347 
2348                 /*
2349                  * If there are no asleep/signaled threads, we are
2350                  * still below pool->p_maxthreads limit, and no thread is
2351                  * currently being created then signal the creator
2352                  * for one more service thread.
2353                  *
2354                  * The asleep and drowsy checks are not protected
2355                  * by a lock since it hurts performance and a wrong
2356                  * decision is not essential.
2357                  */
2358                 if (pool->p_asleep == 0 && !pool->p_drowsy &&
2359                     pool->p_threads + pool->p_detached_threads <
2360                     pool->p_maxthreads)
2361                         svc_creator_signal(pool);
2362 
2363                 /*
2364                  * Process the request.
2365                  */
2366                 svc_getreq(clone_xprt, mp);
2367 
2368                 /* If thread had a reservation it should have been canceled */
2369                 ASSERT(!clone_xprt->xp_reserved);
2370 
2371                 /*
2372                  * If the clone is marked detached then exit.
2373                  * The rpcmod slot has already been released
2374                  * when we detached this thread.
2375                  */
2376                 if (clone_xprt->xp_detached) {
2377                         svc_thread_exitdetached(pool, clone_xprt);
2378                         return (0);
2379                 }
2380 
2381                 /*
2382                  * Release our reference on the rpcmod
2383                  * slot attached to xp_wq->q_ptr.
2384                  */
2385                 mutex_enter(&xprt->xp_req_lock);
2386                 enable = xprt->xp_enable;
2387                 if (enable)
2388                         xprt->xp_enable = FALSE;
2389                 mutex_exit(&xprt->xp_req_lock);
2390                 (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL, enable);
2391         }
2392         /* NOTREACHED */
2393 }
2394 
2395 /*
2396  * Flush any pending requests for the queue and
2397  * free the associated mblks.
2398  */
2399 void
2400 svc_queueclean(queue_t *q)
2401 {
2402         SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
2403         mblk_t *mp;
2404         SVCPOOL *pool;
2405 
2406         /*
2407          * clean up the requests
2408          */
2409         mutex_enter(&xprt->xp_req_lock);
2410         pool = xprt->xp_pool;
2411         while ((mp = xprt->xp_req_head) != NULL) {
2412                 /* remove the request from the list */
2413                 xprt->xp_req_head = mp->b_next;
2414                 mp->b_next = (mblk_t *)0;
2415                 (*RELE_PROC(xprt)) (xprt->xp_wq, mp, FALSE);
2416         }
2417 
2418         mutex_enter(&pool->p_req_lock);
2419         pool->p_reqs -= xprt->xp_reqs;
2420         pool->p_size -= xprt->xp_size;
2421         mutex_exit(&pool->p_req_lock);
2422 
2423         xprt->xp_reqs = 0;
2424         xprt->xp_size = 0;
2425         xprt->xp_full = FALSE;
2426         xprt->xp_enable = FALSE;
2427         mutex_exit(&xprt->xp_req_lock);
2428 }
2429 
2430 /*
2431  * This routine is called by rpcmod to inform kernel RPC that a
2432  * queue is closing. It is called after all the requests have been
2433  * picked up (that is after all the slots on the queue have
2434  * been released by kernel RPC). It is also guaranteed that no more
2435  * request will be delivered on this transport.
2436  *
2437  * - clear xp_wq to mark the master server transport handle as closing
2438  * - if there are no more threads on this transport close/destroy it
2439  * - otherwise, leave the linked threads to close/destroy the transport
2440  *   later.
2441  */
2442 void
2443 svc_queueclose(queue_t *q)
2444 {
2445         SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
2446 
2447         if (xprt == NULL) {
2448                 /*
2449                  * If there is no master xprt associated with this stream,
2450                  * then there is nothing to do.  This happens regularly
2451                  * with connection-oriented listening streams created by
2452                  * nfsd.
2453                  */
2454                 return;
2455         }
2456 
2457         mutex_enter(&xprt->xp_thread_lock);
2458 
2459         ASSERT(xprt->xp_req_head == NULL);
2460         ASSERT(xprt->xp_wq != NULL);
2461 
2462         xprt->xp_wq = NULL;
2463 
2464         if (xprt->xp_threads == 0) {
2465                 SVCPOOL *pool = xprt->xp_pool;
2466 
2467                 /*
2468                  * svc_xprt_cleanup() destroys the transport
2469                  * or releases the transport thread lock
2470                  */
2471                 svc_xprt_cleanup(xprt, FALSE);
2472 
2473                 mutex_enter(&pool->p_thread_lock);
2474 
2475                 /*
2476                  * If the pool is in closing state and this was
2477                  * the last transport in the pool then signal the creator
2478                  * thread to clean up and exit.
2479                  */
2480                 if (pool->p_closing && svc_pool_tryexit(pool)) {
2481                         return;
2482                 }
2483                 mutex_exit(&pool->p_thread_lock);
2484         } else {
2485                 /*
2486                  * There are still some threads linked to the transport.  They
2487                  * are very likely sleeping in svc_poll().  We could wake up
2488                  * them by broadcasting on the p_req_cv condition variable, but
2489                  * that might give us a performance penalty if there are too
2490                  * many sleeping threads.
2491                  *
2492                  * Instead, we do nothing here.  The linked threads will unlink
2493                  * themselves and destroy the transport once they are woken up
2494                  * on timeout, or by new request.  There is no reason to hurry
2495                  * up now with the thread wake up.
2496                  */
2497 
2498                 /*
2499                  *  NOTICE: No references to the master transport structure
2500                  *          beyond this point!
2501                  */
2502                 mutex_exit(&xprt->xp_thread_lock);
2503         }
2504 }
2505 
2506 /*
2507  * Interrupt `request delivery' routine called from rpcmod
2508  * - put a request at the tail of the transport request queue
2509  * - insert a hint for svc_poll() into the xprt-ready queue
2510  * - increment the `pending-requests' count for the pool
2511  * - handle flow control
2512  * - wake up a thread sleeping in svc_poll() if necessary
2513  * - if all the threads are running ask the creator for a new one.
2514  */
2515 bool_t
2516 svc_queuereq(queue_t *q, mblk_t *mp, bool_t flowcontrol)
2517 {
2518         SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
2519         SVCPOOL *pool = xprt->xp_pool;
2520         size_t size;
2521 
2522         TRACE_0(TR_FAC_KRPC, TR_SVC_QUEUEREQ_START, "svc_queuereq_start");
2523 
2524         ASSERT(!is_system_labeled() || msg_getcred(mp, NULL) != NULL ||
2525             mp->b_datap->db_type != M_DATA);
2526 
2527         /*
2528          * Step 1.
2529          * Grab the transport's request lock and the
2530          * pool's request lock so that when we put
2531          * the request at the tail of the transport's
2532          * request queue, possibly put the request on
2533          * the xprt ready queue and increment the
2534          * pending request count it looks atomic.
2535          */
2536         mutex_enter(&xprt->xp_req_lock);
2537         if (flowcontrol && xprt->xp_full) {
2538                 mutex_exit(&xprt->xp_req_lock);
2539 
2540                 return (FALSE);
2541         }
2542         ASSERT(xprt->xp_full == FALSE);
2543         mutex_enter(&pool->p_req_lock);
2544         if (xprt->xp_req_head == NULL)
2545                 xprt->xp_req_head = mp;
2546         else
2547                 xprt->xp_req_tail->b_next = mp;
2548         xprt->xp_req_tail = mp;
2549 
2550         /*
2551          * Step 2.
2552          * Insert a hint into the xprt-ready queue, increment
2553          * counters, handle flow control, and wake up
2554          * a thread sleeping in svc_poll() if necessary.
2555          */
2556 
2557         /* Insert pointer to this transport into the xprt-ready queue */
2558         svc_xprt_qput(pool, xprt);
2559 
2560         /* Increment counters */
2561         pool->p_reqs++;
2562         xprt->xp_reqs++;
2563 
2564         size = svc_msgsize(mp);
2565         xprt->xp_size += size;
2566         pool->p_size += size;
2567 
2568         /* Handle flow control */
2569         if (flowcontrol)
2570                 svc_flowcontrol(xprt);
2571 
2572         TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_ENQ,
2573             "rpc_que_req_enq:pool %p mp %p", pool, mp);
2574 
2575         /*
2576          * If there are more requests and req_cv hasn't
2577          * been signaled yet then wake up one more thread now.
2578          *
2579          * We avoid signaling req_cv until the most recently
2580          * signaled thread wakes up and gets CPU to clear
2581          * the `drowsy' flag.
2582          */
2583         if (pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
2584             pool->p_asleep == 0) {
2585                 mutex_exit(&pool->p_req_lock);
2586         } else {
2587                 pool->p_drowsy = TRUE;
2588                 pool->p_asleep--;
2589 
2590                 /*
2591                  * Signal wakeup and drop the request lock.
2592                  */
2593                 cv_signal(&pool->p_req_cv);
2594                 mutex_exit(&pool->p_req_lock);
2595         }
2596         mutex_exit(&xprt->xp_req_lock);
2597 
2598         /*
2599          * Step 3.
2600          * If there are no asleep/signaled threads, we are
2601          * still below pool->p_maxthreads limit, and no thread is
2602          * currently being created then signal the creator
2603          * for one more service thread.
2604          *
2605          * The asleep and drowsy checks are not not protected
2606          * by a lock since it hurts performance and a wrong
2607          * decision is not essential.
2608          */
2609         if (pool->p_asleep == 0 && !pool->p_drowsy &&
2610             pool->p_threads + pool->p_detached_threads < pool->p_maxthreads)
2611                 svc_creator_signal(pool);
2612 
2613         TRACE_1(TR_FAC_KRPC, TR_SVC_QUEUEREQ_END,
2614             "svc_queuereq_end:(%S)", "end");
2615 
2616         return (TRUE);
2617 }
2618 
2619 /*
2620  * Reserve a service thread so that it can be detached later.
2621  * This reservation is required to make sure that when it tries to
2622  * detach itself the total number of detached threads does not exceed
2623  * pool->p_maxthreads - pool->p_redline (i.e. that we can have
2624  * up to pool->p_redline non-detached threads).
2625  *
2626  * If the thread does not detach itself later, it should cancel the
2627  * reservation before returning to svc_run().
2628  *
2629  * - check if there is room for more reserved/detached threads
2630  * - if so, then increment the `reserved threads' count for the pool
2631  * - mark the thread as reserved (setting the flag in the clone transport
2632  *   handle for this thread
2633  * - returns 1 if the reservation succeeded, 0 if it failed.
2634  */
2635 int
2636 svc_reserve_thread(SVCXPRT *clone_xprt)
2637 {
2638         SVCPOOL *pool = clone_xprt->xp_master->xp_pool;
2639 
2640         /* Recursive reservations are not allowed */
2641         ASSERT(!clone_xprt->xp_reserved);
2642         ASSERT(!clone_xprt->xp_detached);
2643 
2644         /* Check pool counts if there is room for reservation */
2645         mutex_enter(&pool->p_thread_lock);
2646         if (pool->p_reserved_threads + pool->p_detached_threads >=
2647             pool->p_maxthreads - pool->p_redline) {
2648                 mutex_exit(&pool->p_thread_lock);
2649                 return (0);
2650         }
2651         pool->p_reserved_threads++;
2652         mutex_exit(&pool->p_thread_lock);
2653 
2654         /* Mark the thread (clone handle) as reserved */
2655         clone_xprt->xp_reserved = TRUE;
2656 
2657         return (1);
2658 }
2659 
2660 /*
2661  * Cancel a reservation for a thread.
2662  * - decrement the `reserved threads' count for the pool
2663  * - clear the flag in the clone transport handle for this thread.
2664  */
2665 void
2666 svc_unreserve_thread(SVCXPRT *clone_xprt)
2667 {
2668         SVCPOOL *pool = clone_xprt->xp_master->xp_pool;
2669 
2670         /* Thread must have a reservation */
2671         ASSERT(clone_xprt->xp_reserved);
2672         ASSERT(!clone_xprt->xp_detached);
2673 
2674         /* Decrement global count */
2675         mutex_enter(&pool->p_thread_lock);
2676         pool->p_reserved_threads--;
2677         mutex_exit(&pool->p_thread_lock);
2678 
2679         /* Clear reservation flag */
2680         clone_xprt->xp_reserved = FALSE;
2681 }
2682 
2683 /*
2684  * Detach a thread from its transport, so that it can block for an
2685  * extended time.  Because the transport can be closed after the thread is
2686  * detached, the thread should have already sent off a reply if it was
2687  * going to send one.
2688  *
2689  * - decrement `non-detached threads' count and increment `detached threads'
2690  *   counts for the transport
2691  * - decrement the  `non-detached threads' and `reserved threads'
2692  *   counts and increment the `detached threads' count for the pool
2693  * - release the rpcmod slot
2694  * - mark the clone (thread) as detached.
2695  *
2696  * No need to return a pointer to the thread's CPR information, since
2697  * the thread has a userland identity.
2698  *
2699  * NOTICE: a thread must not detach itself without making a prior reservation
2700  *         through svc_thread_reserve().
2701  */
2702 callb_cpr_t *
2703 svc_detach_thread(SVCXPRT *clone_xprt)
2704 {
2705         SVCMASTERXPRT *xprt = clone_xprt->xp_master;
2706         SVCPOOL *pool = xprt->xp_pool;
2707         bool_t enable;
2708 
2709         /* Thread must have a reservation */
2710         ASSERT(clone_xprt->xp_reserved);
2711         ASSERT(!clone_xprt->xp_detached);
2712 
2713         /* Bookkeeping for this transport */
2714         mutex_enter(&xprt->xp_thread_lock);
2715         xprt->xp_threads--;
2716         xprt->xp_detached_threads++;
2717         mutex_exit(&xprt->xp_thread_lock);
2718 
2719         /* Bookkeeping for the pool */
2720         mutex_enter(&pool->p_thread_lock);
2721         pool->p_threads--;
2722         pool->p_reserved_threads--;
2723         pool->p_detached_threads++;
2724         mutex_exit(&pool->p_thread_lock);
2725 
2726         /* Release an rpcmod slot for this request */
2727         mutex_enter(&xprt->xp_req_lock);
2728         enable = xprt->xp_enable;
2729         if (enable)
2730                 xprt->xp_enable = FALSE;
2731         mutex_exit(&xprt->xp_req_lock);
2732         (*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL, enable);
2733 
2734         /* Mark the clone (thread) as detached */
2735         clone_xprt->xp_reserved = FALSE;
2736         clone_xprt->xp_detached = TRUE;
2737 
2738         return (NULL);
2739 }
2740 
2741 /*
2742  * This routine is responsible for extracting RDMA plugin master XPRT,
2743  * unregister from the SVCPOOL and initiate plugin specific cleanup.
2744  * It is passed a list/group of rdma transports as records which are
2745  * active in a given registered or unregistered kRPC thread pool. Its shuts
2746  * all active rdma transports in that pool. If the thread active on the trasport
2747  * happens to be last thread for that pool, it will signal the creater thread
2748  * to cleanup the pool and destroy the xprt in svc_queueclose()
2749  */
2750 void
2751 rdma_stop(rdma_xprt_group_t *rdma_xprts)
2752 {
2753         SVCMASTERXPRT *xprt;
2754         rdma_xprt_record_t *curr_rec;
2755         queue_t *q;
2756         mblk_t *mp;
2757         int i, rtg_count;
2758         SVCPOOL *pool;
2759 
2760         if (rdma_xprts->rtg_count == 0)
2761                 return;
2762 
2763         rtg_count = rdma_xprts->rtg_count;
2764 
2765         for (i = 0; i < rtg_count; i++) {
2766                 curr_rec = rdma_xprts->rtg_listhead;
2767                 rdma_xprts->rtg_listhead = curr_rec->rtr_next;
2768                 rdma_xprts->rtg_count--;
2769                 curr_rec->rtr_next = NULL;
2770                 xprt = curr_rec->rtr_xprt_ptr;
2771                 q = xprt->xp_wq;
2772                 svc_rdma_kstop(xprt);
2773 
2774                 mutex_enter(&xprt->xp_req_lock);
2775                 pool = xprt->xp_pool;
2776                 while ((mp = xprt->xp_req_head) != NULL) {
2777                         rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr;
2778 
2779                         /* remove the request from the list */
2780                         xprt->xp_req_head = mp->b_next;
2781                         mp->b_next = (mblk_t *)0;
2782 
2783                         RDMA_BUF_FREE(rdp->conn, &rdp->rpcmsg);
2784                         RDMA_REL_CONN(rdp->conn);
2785                         freemsg(mp);
2786                 }
2787                 mutex_enter(&pool->p_req_lock);
2788                 pool->p_reqs -= xprt->xp_reqs;
2789                 pool->p_size -= xprt->xp_size;
2790                 mutex_exit(&pool->p_req_lock);
2791                 xprt->xp_reqs = 0;
2792                 xprt->xp_size = 0;
2793                 xprt->xp_full = FALSE;
2794                 xprt->xp_enable = FALSE;
2795                 mutex_exit(&xprt->xp_req_lock);
2796                 svc_queueclose(q);
2797 #ifdef  DEBUG
2798                 if (rdma_check)
2799                         cmn_err(CE_NOTE, "rdma_stop: Exited svc_queueclose\n");
2800 #endif
2801                 /*
2802                  * Free the rdma transport record for the expunged rdma
2803                  * based master transport handle.
2804                  */
2805                 kmem_free(curr_rec, sizeof (rdma_xprt_record_t));
2806                 if (!rdma_xprts->rtg_listhead)
2807                         break;
2808         }
2809 }
2810 
2811 
2812 /*
2813  * rpc_msg_dup/rpc_msg_free
2814  * Currently only used by svc_rpcsec_gss.c but put in this file as it
2815  * may be useful to others in the future.
2816  * But future consumers should be careful cuz so far
2817  *   - only tested/used for call msgs (not reply)
2818  *   - only tested/used with call verf oa_length==0
2819  */
2820 struct rpc_msg *
2821 rpc_msg_dup(struct rpc_msg *src)
2822 {
2823         struct rpc_msg *dst;
2824         struct opaque_auth oa_src, oa_dst;
2825 
2826         dst = kmem_alloc(sizeof (*dst), KM_SLEEP);
2827 
2828         dst->rm_xid = src->rm_xid;
2829         dst->rm_direction = src->rm_direction;
2830 
2831         dst->rm_call.cb_rpcvers = src->rm_call.cb_rpcvers;
2832         dst->rm_call.cb_prog = src->rm_call.cb_prog;
2833         dst->rm_call.cb_vers = src->rm_call.cb_vers;
2834         dst->rm_call.cb_proc = src->rm_call.cb_proc;
2835 
2836         /* dup opaque auth call body cred */
2837         oa_src = src->rm_call.cb_cred;
2838 
2839         oa_dst.oa_flavor = oa_src.oa_flavor;
2840         oa_dst.oa_base = kmem_alloc(oa_src.oa_length, KM_SLEEP);
2841 
2842         bcopy(oa_src.oa_base, oa_dst.oa_base, oa_src.oa_length);
2843         oa_dst.oa_length = oa_src.oa_length;
2844 
2845         dst->rm_call.cb_cred = oa_dst;
2846 
2847         /* dup or just alloc opaque auth call body verifier */
2848         if (src->rm_call.cb_verf.oa_length > 0) {
2849                 oa_src = src->rm_call.cb_verf;
2850 
2851                 oa_dst.oa_flavor = oa_src.oa_flavor;
2852                 oa_dst.oa_base = kmem_alloc(oa_src.oa_length, KM_SLEEP);
2853 
2854                 bcopy(oa_src.oa_base, oa_dst.oa_base, oa_src.oa_length);
2855                 oa_dst.oa_length = oa_src.oa_length;
2856 
2857                 dst->rm_call.cb_verf = oa_dst;
2858         } else {
2859                 oa_dst.oa_flavor = -1;  /* will be set later */
2860                 oa_dst.oa_base = kmem_alloc(MAX_AUTH_BYTES, KM_SLEEP);
2861 
2862                 oa_dst.oa_length = 0;   /* will be set later */
2863 
2864                 dst->rm_call.cb_verf = oa_dst;
2865         }
2866         return (dst);
2867 
2868 error:
2869         kmem_free(dst->rm_call.cb_cred.oa_base,      dst->rm_call.cb_cred.oa_length);
2870         kmem_free(dst, sizeof (*dst));
2871         return (NULL);
2872 }
2873 
2874 void
2875 rpc_msg_free(struct rpc_msg **msg, int cb_verf_oa_length)
2876 {
2877         struct rpc_msg *m = *msg;
2878 
2879         kmem_free(m->rm_call.cb_cred.oa_base, m->rm_call.cb_cred.oa_length);
2880         m->rm_call.cb_cred.oa_base = NULL;
2881         m->rm_call.cb_cred.oa_length = 0;
2882 
2883         kmem_free(m->rm_call.cb_verf.oa_base, cb_verf_oa_length);
2884         m->rm_call.cb_verf.oa_base = NULL;
2885         m->rm_call.cb_verf.oa_length = 0;
2886 
2887         kmem_free(m, sizeof (*m));
2888         m = NULL;
2889 }