1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 
  22 /*
  23  * Copyright 2012 Marcel Telka <marcel@telka.sk>
  24  * Copyright 2015 Nexenta Systems, Inc.  All rights reserved.
  25  * Copyright 2018 OmniOS Community Edition (OmniOSce) Association.
  26  */
  27 
  28 /*
  29  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  30  * Use is subject to license terms.
  31  */
  32 
  33 /*
  34  * Copyright 1993 OpenVision Technologies, Inc., All Rights Reserved.
  35  */
  36 
  37 /*      Copyright (c) 1983, 1984, 1985,  1986, 1987, 1988, 1989 AT&T        */
  38 /*      All Rights Reserved     */
  39 
  40 /*
  41  * Portions of this source code were derived from Berkeley 4.3 BSD
  42  * under license from the Regents of the University of California.
  43  */
  44 
  45 /*
  46  * Server-side remote procedure call interface.
  47  *
  48  * Master transport handle (SVCMASTERXPRT).
  49  *   The master transport handle structure is shared among service
  50  *   threads processing events on the transport. Some fields in the
  51  *   master structure are protected by locks
  52  *   - xp_req_lock protects the request queue:
  53  *      xp_req_head, xp_req_tail, xp_reqs, xp_size, xp_full, xp_enable
  54  *   - xp_thread_lock protects the thread (clone) counts
  55  *      xp_threads, xp_detached_threads, xp_wq
  56  *   Each master transport is registered to exactly one thread pool.
  57  *
  58  * Clone transport handle (SVCXPRT)
  59  *   The clone transport handle structure is a per-service-thread handle
  60  *   to the transport. The structure carries all the fields/buffers used
  61  *   for request processing. A service thread or, in other words, a clone
  62  *   structure, can be linked to an arbitrary master structure to process
  63  *   requests on this transport. The master handle keeps track of reference
  64  *   counts of threads (clones) linked to it. A service thread can switch
  65  *   to another transport by unlinking its clone handle from the current
  66  *   transport and linking to a new one. Switching is relatively inexpensive
  67  *   but it involves locking (master's xprt->xp_thread_lock).
  68  *
  69  * Pools.
  70  *   A pool represents a kernel RPC service (NFS, Lock Manager, etc.).
  71  *   Transports related to the service are registered to the service pool.
  72  *   Service threads can switch between different transports in the pool.
  73  *   Thus, each service has its own pool of service threads. The maximum
  74  *   number of threads in a pool is pool->p_maxthreads. This limit allows
  75  *   to restrict resource usage by the service. Some fields are protected
  76  *   by locks:
  77  *   - p_req_lock protects several counts and flags:
  78  *      p_reqs, p_size, p_walkers, p_asleep, p_drowsy, p_req_cv
  79  *   - p_thread_lock governs other thread counts:
  80  *      p_threads, p_detached_threads, p_reserved_threads, p_closing
  81  *
  82  *   In addition, each pool contains a doubly-linked list of transports,
  83  *   an `xprt-ready' queue and a creator thread (see below). Threads in
  84  *   the pool share some other parameters such as stack size and
  85  *   polling timeout.
  86  *
  87  *   Pools are initialized through the svc_pool_create() function called from
  88  *   the nfssys() system call. However, thread creation must be done by
  89  *   the userland agent. This is done by using SVCPOOL_WAIT and
  90  *   SVCPOOL_RUN arguments to nfssys(), which call svc_wait() and
  91  *   svc_do_run(), respectively. Once the pool has been initialized,
  92  *   the userland process must set up a 'creator' thread. This thread
  93  *   should park itself in the kernel by calling svc_wait(). If
  94  *   svc_wait() returns successfully, it should fork off a new worker
  95  *   thread, which then calls svc_do_run() in order to get work. When
  96  *   that thread is complete, svc_do_run() will return, and the user
  97  *   program should call thr_exit().
  98  *
  99  *   When we try to register a new pool and there is an old pool with
 100  *   the same id in the doubly linked pool list (this happens when we kill
 101  *   and restart nfsd or lockd), then we unlink the old pool from the list
 102  *   and mark its state as `closing'. After that the transports can still
 103  *   process requests but new transports won't be registered. When all the
 104  *   transports and service threads associated with the pool are gone the
 105  *   creator thread (see below) will clean up the pool structure and exit.
 106  *
 107  * svc_queuereq() and svc_run().
 108  *   The kernel RPC server is interrupt driven. The svc_queuereq() interrupt
 109  *   routine is called to deliver an RPC request. The service threads
 110  *   loop in svc_run(). The interrupt function queues a request on the
 111  *   transport's queue and it makes sure that the request is serviced.
 112  *   It may either wake up one of sleeping threads, or ask for a new thread
 113  *   to be created, or, if the previous request is just being picked up, do
 114  *   nothing. In the last case the service thread that is picking up the
 115  *   previous request will wake up or create the next thread. After a service
 116  *   thread processes a request and sends a reply it returns to svc_run()
 117  *   and svc_run() calls svc_poll() to find new input.
 118  *
 119  * svc_poll().
 120  *   In order to avoid unnecessary locking, which causes performance
 121  *   problems, we always look for a pending request on the current transport.
 122  *   If there is none we take a hint from the pool's `xprt-ready' queue.
 123  *   If the queue had an overflow we switch to the `drain' mode checking
 124  *   each transport  in the pool's transport list. Once we find a
 125  *   master transport handle with a pending request we latch the request
 126  *   lock on this transport and return to svc_run(). If the request
 127  *   belongs to a transport different than the one the service thread is
 128  *   linked to we need to unlink and link again.
 129  *
 130  *   A service thread goes asleep when there are no pending
 131  *   requests on the transports registered on the pool's transports.
 132  *   All the pool's threads sleep on the same condition variable.
 133  *   If a thread has been sleeping for too long period of time
 134  *   (by default 5 seconds) it wakes up and exits.  Also when a transport
 135  *   is closing sleeping threads wake up to unlink from this transport.
 136  *
 137  * The `xprt-ready' queue.
 138  *   If a service thread finds no request on a transport it is currently linked
 139  *   to it will find another transport with a pending request. To make
 140  *   this search more efficient each pool has an `xprt-ready' queue.
 141  *   The queue is a FIFO. When the interrupt routine queues a request it also
 142  *   inserts a pointer to the transport into the `xprt-ready' queue. A
 143  *   thread looking for a transport with a pending request can pop up a
 144  *   transport and check for a request. The request can be already gone
 145  *   since it could be taken by a thread linked to that transport. In such a
 146  *   case we try the next hint. The `xprt-ready' queue has fixed size (by
 147  *   default 256 nodes). If it overflows svc_poll() has to switch to the
 148  *   less efficient but safe `drain' mode and walk through the pool's
 149  *   transport list.
 150  *
 151  *   Both the svc_poll() loop and the `xprt-ready' queue are optimized
 152  *   for the peak load case that is for the situation when the queue is not
 153  *   empty, there are all the time few pending requests, and a service
 154  *   thread which has just processed a request does not go asleep but picks
 155  *   up immediately the next request.
 156  *
 157  * Thread creator.
 158  *   Each pool has a thread creator associated with it. The creator thread
 159  *   sleeps on a condition variable and waits for a signal to create a
 160  *   service thread. The actual thread creation is done in userland by
 161  *   the method described in "Pools" above.
 162  *
 163  *   Signaling threads should turn on the `creator signaled' flag, and
 164  *   can avoid sending signals when the flag is on. The flag is cleared
 165  *   when the thread is created.
 166  *
 167  *   When the pool is in closing state (ie it has been already unregistered
 168  *   from the pool list) the last thread on the last transport in the pool
 169  *   should turn the p_creator_exit flag on. The creator thread will
 170  *   clean up the pool structure and exit.
 171  *
 172  * Thread reservation; Detaching service threads.
 173  *   A service thread can detach itself to block for an extended amount
 174  *   of time. However, to keep the service active we need to guarantee
 175  *   at least pool->p_redline non-detached threads that can process incoming
 176  *   requests. This, the maximum number of detached and reserved threads is
 177  *   p->p_maxthreads - p->p_redline. A service thread should first acquire
 178  *   a reservation, and if the reservation was granted it can detach itself.
 179  *   If a reservation was granted but the thread does not detach itself
 180  *   it should cancel the reservation before it returns to svc_run().
 181  */
 182 
 183 #include <sys/param.h>
 184 #include <sys/types.h>
 185 #include <rpc/types.h>
 186 #include <sys/socket.h>
 187 #include <sys/time.h>
 188 #include <sys/tiuser.h>
 189 #include <sys/t_kuser.h>
 190 #include <netinet/in.h>
 191 #include <rpc/xdr.h>
 192 #include <rpc/auth.h>
 193 #include <rpc/clnt.h>
 194 #include <rpc/rpc_msg.h>
 195 #include <rpc/svc.h>
 196 #include <sys/proc.h>
 197 #include <sys/user.h>
 198 #include <sys/stream.h>
 199 #include <sys/strsubr.h>
 200 #include <sys/strsun.h>
 201 #include <sys/tihdr.h>
 202 #include <sys/debug.h>
 203 #include <sys/cmn_err.h>
 204 #include <sys/file.h>
 205 #include <sys/systm.h>
 206 #include <sys/callb.h>
 207 #include <sys/vtrace.h>
 208 #include <sys/zone.h>
 209 #include <nfs/nfs.h>
 210 #include <sys/tsol/label_macro.h>
 211 
 212 /*
 213  * Defines for svc_poll()
 214  */
 215 #define SVC_EXPRTGONE ((SVCMASTERXPRT *)1)      /* Transport is closing */
 216 #define SVC_ETIMEDOUT ((SVCMASTERXPRT *)2)      /* Timeout */
 217 #define SVC_EINTR ((SVCMASTERXPRT *)3)          /* Interrupted by signal */
 218 
 219 /*
 220  * Default stack size for service threads.
 221  */
 222 #define DEFAULT_SVC_RUN_STKSIZE         (0)     /* default kernel stack */
 223 
 224 volatile int    svc_default_stksize = DEFAULT_SVC_RUN_STKSIZE;
 225 
 226 /*
 227  * Default polling timeout for service threads.
 228  * Multiplied by hz when used.
 229  */
 230 #define DEFAULT_SVC_POLL_TIMEOUT        (5)     /* seconds */
 231 
 232 clock_t svc_default_timeout = DEFAULT_SVC_POLL_TIMEOUT;
 233 
 234 /*
 235  * Size of the `xprt-ready' queue.
 236  */
 237 #define DEFAULT_SVC_QSIZE               (256)   /* qnodes */
 238 
 239 size_t svc_default_qsize = DEFAULT_SVC_QSIZE;
 240 
 241 /*
 242  * Default limit for the number of service threads.
 243  */
 244 #define DEFAULT_SVC_MAXTHREADS          (INT16_MAX)
 245 
 246 int    svc_default_maxthreads = DEFAULT_SVC_MAXTHREADS;
 247 
 248 /*
 249  * Maximum number of requests from the same transport (in `drain' mode).
 250  */
 251 #define DEFAULT_SVC_MAX_SAME_XPRT       (8)
 252 
 253 volatile int    svc_default_max_same_xprt = DEFAULT_SVC_MAX_SAME_XPRT;
 254 
 255 
 256 /*
 257  * Default `Redline' of non-detached threads.
 258  * Total number of detached and reserved threads in an RPC server
 259  * thread pool is limited to pool->p_maxthreads - svc_redline.
 260  */
 261 #define DEFAULT_SVC_REDLINE             (1)
 262 
 263 int    svc_default_redline = DEFAULT_SVC_REDLINE;
 264 
 265 /*
 266  * A node for the `xprt-ready' queue.
 267  * See below.
 268  */
 269 struct __svcxprt_qnode {
 270         __SVCXPRT_QNODE *q_next;
 271         SVCMASTERXPRT   *q_xprt;
 272 };
 273 
 274 /*
 275  * Global SVC variables (private).
 276  */
 277 struct svc_globals {
 278         SVCPOOL         *svc_pools;
 279         kmutex_t        svc_plock;
 280 };
 281 
 282 /*
 283  * Debug variable to check for rdma based
 284  * transport startup and cleanup. Contorlled
 285  * through /etc/system. Off by default.
 286  */
 287 int rdma_check = 0;
 288 
 289 /*
 290  * This allows disabling flow control in svc_queuereq().
 291  */
 292 volatile int svc_flowcontrol_disable = 0;
 293 
 294 /*
 295  * Authentication parameters list.
 296  */
 297 static caddr_t rqcred_head;
 298 static kmutex_t rqcred_lock;
 299 
 300 /*
 301  * If true, then keep quiet about version mismatch.
 302  * This macro is for broadcast RPC only. We have no broadcast RPC in
 303  * kernel now but one may define a flag in the transport structure
 304  * and redefine this macro.
 305  */
 306 #define version_keepquiet(xprt) (FALSE)
 307 
 308 /*
 309  * ZSD key used to retrieve zone-specific svc globals
 310  */
 311 static zone_key_t svc_zone_key;
 312 
 313 static void svc_callout_free(SVCMASTERXPRT *);
 314 static void svc_xprt_qinit(SVCPOOL *, size_t);
 315 static void svc_xprt_qdestroy(SVCPOOL *);
 316 static void svc_thread_creator(SVCPOOL *);
 317 static void svc_creator_signal(SVCPOOL *);
 318 static void svc_creator_signalexit(SVCPOOL *);
 319 static void svc_pool_unregister(struct svc_globals *, SVCPOOL *);
 320 static int svc_run(SVCPOOL *);
 321 
 322 /* ARGSUSED */
 323 static void *
 324 svc_zoneinit(zoneid_t zoneid)
 325 {
 326         struct svc_globals *svc;
 327 
 328         svc = kmem_alloc(sizeof (*svc), KM_SLEEP);
 329         mutex_init(&svc->svc_plock, NULL, MUTEX_DEFAULT, NULL);
 330         svc->svc_pools = NULL;
 331         return (svc);
 332 }
 333 
 334 /* ARGSUSED */
 335 static void
 336 svc_zoneshutdown(zoneid_t zoneid, void *arg)
 337 {
 338         struct svc_globals *svc = arg;
 339         SVCPOOL *pool;
 340 
 341         mutex_enter(&svc->svc_plock);
 342         while ((pool = svc->svc_pools) != NULL) {
 343                 svc_pool_unregister(svc, pool);
 344         }
 345         mutex_exit(&svc->svc_plock);
 346 }
 347 
 348 /* ARGSUSED */
 349 static void
 350 svc_zonefini(zoneid_t zoneid, void *arg)
 351 {
 352         struct svc_globals *svc = arg;
 353 
 354         ASSERT(svc->svc_pools == NULL);
 355         mutex_destroy(&svc->svc_plock);
 356         kmem_free(svc, sizeof (*svc));
 357 }
 358 
 359 /*
 360  * Global SVC init routine.
 361  * Initialize global generic and transport type specific structures
 362  * used by the kernel RPC server side. This routine is called only
 363  * once when the module is being loaded.
 364  */
 365 void
 366 svc_init()
 367 {
 368         zone_key_create(&svc_zone_key, svc_zoneinit, svc_zoneshutdown,
 369             svc_zonefini);
 370         svc_cots_init();
 371         svc_clts_init();
 372 }
 373 
 374 /*
 375  * Destroy the SVCPOOL structure.
 376  */
 377 static void
 378 svc_pool_cleanup(SVCPOOL *pool)
 379 {
 380         ASSERT(pool->p_threads + pool->p_detached_threads == 0);
 381         ASSERT(pool->p_lcount == 0);
 382         ASSERT(pool->p_closing);
 383 
 384         /*
 385          * Call the user supplied shutdown function.  This is done
 386          * here so the user of the pool will be able to cleanup
 387          * service related resources.
 388          */
 389         if (pool->p_shutdown != NULL)
 390                 (pool->p_shutdown)();
 391 
 392         /* Destroy `xprt-ready' queue */
 393         svc_xprt_qdestroy(pool);
 394 
 395         /* Destroy transport list */
 396         rw_destroy(&pool->p_lrwlock);
 397 
 398         /* Destroy locks and condition variables */
 399         mutex_destroy(&pool->p_thread_lock);
 400         mutex_destroy(&pool->p_req_lock);
 401         cv_destroy(&pool->p_req_cv);
 402 
 403         /* Destroy creator's locks and condition variables */
 404         mutex_destroy(&pool->p_creator_lock);
 405         cv_destroy(&pool->p_creator_cv);
 406         mutex_destroy(&pool->p_user_lock);
 407         cv_destroy(&pool->p_user_cv);
 408 
 409         /* Free pool structure */
 410         kmem_free(pool, sizeof (SVCPOOL));
 411 }
 412 
 413 /*
 414  * If all the transports and service threads are already gone
 415  * signal the creator thread to clean up and exit.
 416  */
 417 static bool_t
 418 svc_pool_tryexit(SVCPOOL *pool)
 419 {
 420         ASSERT(MUTEX_HELD(&pool->p_thread_lock));
 421         ASSERT(pool->p_closing);
 422 
 423         if (pool->p_threads + pool->p_detached_threads == 0) {
 424                 rw_enter(&pool->p_lrwlock, RW_READER);
 425                 if (pool->p_lcount == 0) {
 426                         /*
 427                          * Release the locks before sending a signal.
 428                          */
 429                         rw_exit(&pool->p_lrwlock);
 430                         mutex_exit(&pool->p_thread_lock);
 431 
 432                         /*
 433                          * Notify the creator thread to clean up and exit
 434                          *
 435                          * NOTICE: No references to the pool beyond this point!
 436                          *                 The pool is being destroyed.
 437                          */
 438                         ASSERT(!MUTEX_HELD(&pool->p_thread_lock));
 439                         svc_creator_signalexit(pool);
 440 
 441                         return (TRUE);
 442                 }
 443                 rw_exit(&pool->p_lrwlock);
 444         }
 445 
 446         ASSERT(MUTEX_HELD(&pool->p_thread_lock));
 447         return (FALSE);
 448 }
 449 
 450 /*
 451  * Find a pool with a given id.
 452  */
 453 static SVCPOOL *
 454 svc_pool_find(struct svc_globals *svc, int id)
 455 {
 456         SVCPOOL *pool;
 457 
 458         ASSERT(MUTEX_HELD(&svc->svc_plock));
 459 
 460         /*
 461          * Search the list for a pool with a matching id
 462          * and register the transport handle with that pool.
 463          */
 464         for (pool = svc->svc_pools; pool; pool = pool->p_next)
 465                 if (pool->p_id == id)
 466                         return (pool);
 467 
 468         return (NULL);
 469 }
 470 
 471 /*
 472  * PSARC 2003/523 Contract Private Interface
 473  * svc_do_run
 474  * Changes must be reviewed by Solaris File Sharing
 475  * Changes must be communicated to contract-2003-523@sun.com
 476  */
 477 int
 478 svc_do_run(int id)
 479 {
 480         SVCPOOL *pool;
 481         int err = 0;
 482         struct svc_globals *svc;
 483 
 484         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
 485         mutex_enter(&svc->svc_plock);
 486 
 487         pool = svc_pool_find(svc, id);
 488 
 489         mutex_exit(&svc->svc_plock);
 490 
 491         if (pool == NULL)
 492                 return (ENOENT);
 493 
 494         /*
 495          * Increment counter of pool threads now
 496          * that a thread has been created.
 497          */
 498         mutex_enter(&pool->p_thread_lock);
 499         pool->p_threads++;
 500         mutex_exit(&pool->p_thread_lock);
 501 
 502         /* Give work to the new thread. */
 503         err = svc_run(pool);
 504 
 505         return (err);
 506 }
 507 
 508 /*
 509  * Unregister a pool from the pool list.
 510  * Set the closing state. If all the transports and service threads
 511  * are already gone signal the creator thread to clean up and exit.
 512  */
 513 static void
 514 svc_pool_unregister(struct svc_globals *svc, SVCPOOL *pool)
 515 {
 516         SVCPOOL *next = pool->p_next;
 517         SVCPOOL *prev = pool->p_prev;
 518 
 519         ASSERT(MUTEX_HELD(&svc->svc_plock));
 520 
 521         /* Remove from the list */
 522         if (pool == svc->svc_pools)
 523                 svc->svc_pools = next;
 524         if (next)
 525                 next->p_prev = prev;
 526         if (prev)
 527                 prev->p_next = next;
 528         pool->p_next = pool->p_prev = NULL;
 529 
 530         /*
 531          * Offline the pool. Mark the pool as closing.
 532          * If there are no transports in this pool notify
 533          * the creator thread to clean it up and exit.
 534          */
 535         mutex_enter(&pool->p_thread_lock);
 536         if (pool->p_offline != NULL)
 537                 (pool->p_offline)();
 538         pool->p_closing = TRUE;
 539         if (svc_pool_tryexit(pool))
 540                 return;
 541         mutex_exit(&pool->p_thread_lock);
 542 }
 543 
 544 /*
 545  * Register a pool with a given id in the global doubly linked pool list.
 546  * - if there is a pool with the same id in the list then unregister it
 547  * - insert the new pool into the list.
 548  */
 549 static void
 550 svc_pool_register(struct svc_globals *svc, SVCPOOL *pool, int id)
 551 {
 552         SVCPOOL *old_pool;
 553 
 554         /*
 555          * If there is a pool with the same id then remove it from
 556          * the list and mark the pool as closing.
 557          */
 558         mutex_enter(&svc->svc_plock);
 559 
 560         if (old_pool = svc_pool_find(svc, id))
 561                 svc_pool_unregister(svc, old_pool);
 562 
 563         /* Insert into the doubly linked list */
 564         pool->p_id = id;
 565         pool->p_next = svc->svc_pools;
 566         pool->p_prev = NULL;
 567         if (svc->svc_pools)
 568                 svc->svc_pools->p_prev = pool;
 569         svc->svc_pools = pool;
 570 
 571         mutex_exit(&svc->svc_plock);
 572 }
 573 
 574 /*
 575  * Initialize a newly created pool structure
 576  */
 577 static int
 578 svc_pool_init(SVCPOOL *pool, uint_t maxthreads, uint_t redline,
 579     uint_t qsize, uint_t timeout, uint_t stksize, uint_t max_same_xprt)
 580 {
 581         klwp_t *lwp = ttolwp(curthread);
 582 
 583         ASSERT(pool);
 584 
 585         if (maxthreads == 0)
 586                 maxthreads = svc_default_maxthreads;
 587         if (redline == 0)
 588                 redline = svc_default_redline;
 589         if (qsize == 0)
 590                 qsize = svc_default_qsize;
 591         if (timeout == 0)
 592                 timeout = svc_default_timeout;
 593         if (stksize == 0)
 594                 stksize = svc_default_stksize;
 595         if (max_same_xprt == 0)
 596                 max_same_xprt = svc_default_max_same_xprt;
 597 
 598         if (maxthreads < redline)
 599                 return (EINVAL);
 600 
 601         /* Allocate and initialize the `xprt-ready' queue */
 602         svc_xprt_qinit(pool, qsize);
 603 
 604         /* Initialize doubly-linked xprt list */
 605         rw_init(&pool->p_lrwlock, NULL, RW_DEFAULT, NULL);
 606 
 607         /*
 608          * Setting lwp_childstksz on the current lwp so that
 609          * descendants of this lwp get the modified stacksize, if
 610          * it is defined. It is important that either this lwp or
 611          * one of its descendants do the actual servicepool thread
 612          * creation to maintain the stacksize inheritance.
 613          */
 614         if (lwp != NULL)
 615                 lwp->lwp_childstksz = stksize;
 616 
 617         /* Initialize thread limits, locks and condition variables */
 618         pool->p_maxthreads = maxthreads;
 619         pool->p_redline = redline;
 620         pool->p_timeout = timeout * hz;
 621         pool->p_stksize = stksize;
 622         pool->p_max_same_xprt = max_same_xprt;
 623         mutex_init(&pool->p_thread_lock, NULL, MUTEX_DEFAULT, NULL);
 624         mutex_init(&pool->p_req_lock, NULL, MUTEX_DEFAULT, NULL);
 625         cv_init(&pool->p_req_cv, NULL, CV_DEFAULT, NULL);
 626 
 627         /* Initialize userland creator */
 628         pool->p_user_exit = FALSE;
 629         pool->p_signal_create_thread = FALSE;
 630         pool->p_user_waiting = FALSE;
 631         mutex_init(&pool->p_user_lock, NULL, MUTEX_DEFAULT, NULL);
 632         cv_init(&pool->p_user_cv, NULL, CV_DEFAULT, NULL);
 633 
 634         /* Initialize the creator and start the creator thread */
 635         pool->p_creator_exit = FALSE;
 636         mutex_init(&pool->p_creator_lock, NULL, MUTEX_DEFAULT, NULL);
 637         cv_init(&pool->p_creator_cv, NULL, CV_DEFAULT, NULL);
 638 
 639         (void) zthread_create(NULL, pool->p_stksize, svc_thread_creator,
 640             pool, 0, minclsyspri);
 641 
 642         return (0);
 643 }
 644 
 645 /*
 646  * PSARC 2003/523 Contract Private Interface
 647  * svc_pool_create
 648  * Changes must be reviewed by Solaris File Sharing
 649  * Changes must be communicated to contract-2003-523@sun.com
 650  *
 651  * Create an kernel RPC server-side thread/transport pool.
 652  *
 653  * This is public interface for creation of a server RPC thread pool
 654  * for a given service provider. Transports registered with the pool's id
 655  * will be served by a pool's threads. This function is called from the
 656  * nfssys() system call.
 657  */
 658 int
 659 svc_pool_create(struct svcpool_args *args)
 660 {
 661         SVCPOOL *pool;
 662         int error;
 663         struct svc_globals *svc;
 664 
 665         /*
 666          * Caller should check credentials in a way appropriate
 667          * in the context of the call.
 668          */
 669 
 670         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
 671         /* Allocate a new pool */
 672         pool = kmem_zalloc(sizeof (SVCPOOL), KM_SLEEP);
 673 
 674         /*
 675          * Initialize the pool structure and create a creator thread.
 676          */
 677         error = svc_pool_init(pool, args->maxthreads, args->redline,
 678             args->qsize, args->timeout, args->stksize, args->max_same_xprt);
 679 
 680         if (error) {
 681                 kmem_free(pool, sizeof (SVCPOOL));
 682                 return (error);
 683         }
 684 
 685         /* Register the pool with the global pool list */
 686         svc_pool_register(svc, pool, args->id);
 687 
 688         return (0);
 689 }
 690 
 691 int
 692 svc_pool_control(int id, int cmd, void *arg)
 693 {
 694         SVCPOOL *pool;
 695         struct svc_globals *svc;
 696 
 697         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
 698 
 699         switch (cmd) {
 700         case SVCPSET_SHUTDOWN_PROC:
 701                 /*
 702                  * Search the list for a pool with a matching id
 703                  * and register the transport handle with that pool.
 704                  */
 705                 mutex_enter(&svc->svc_plock);
 706 
 707                 if ((pool = svc_pool_find(svc, id)) == NULL) {
 708                         mutex_exit(&svc->svc_plock);
 709                         return (ENOENT);
 710                 }
 711                 /*
 712                  * Grab the transport list lock before releasing the
 713                  * pool list lock
 714                  */
 715                 rw_enter(&pool->p_lrwlock, RW_WRITER);
 716                 mutex_exit(&svc->svc_plock);
 717 
 718                 pool->p_shutdown = *((void (*)())arg);
 719 
 720                 rw_exit(&pool->p_lrwlock);
 721 
 722                 return (0);
 723         case SVCPSET_UNREGISTER_PROC:
 724                 /*
 725                  * Search the list for a pool with a matching id
 726                  * and register the unregister callback handle with that pool.
 727                  */
 728                 mutex_enter(&svc->svc_plock);
 729 
 730                 if ((pool = svc_pool_find(svc, id)) == NULL) {
 731                         mutex_exit(&svc->svc_plock);
 732                         return (ENOENT);
 733                 }
 734                 /*
 735                  * Grab the transport list lock before releasing the
 736                  * pool list lock
 737                  */
 738                 rw_enter(&pool->p_lrwlock, RW_WRITER);
 739                 mutex_exit(&svc->svc_plock);
 740 
 741                 pool->p_offline = *((void (*)())arg);
 742 
 743                 rw_exit(&pool->p_lrwlock);
 744 
 745                 return (0);
 746         default:
 747                 return (EINVAL);
 748         }
 749 }
 750 
 751 /*
 752  * Pool's transport list manipulation routines.
 753  * - svc_xprt_register()
 754  * - svc_xprt_unregister()
 755  *
 756  * svc_xprt_register() is called from svc_tli_kcreate() to
 757  * insert a new master transport handle into the doubly linked
 758  * list of server transport handles (one list per pool).
 759  *
 760  * The list is used by svc_poll(), when it operates in `drain'
 761  * mode, to search for a next transport with a pending request.
 762  */
 763 
 764 int
 765 svc_xprt_register(SVCMASTERXPRT *xprt, int id)
 766 {
 767         SVCMASTERXPRT *prev, *next;
 768         SVCPOOL *pool;
 769         struct svc_globals *svc;
 770 
 771         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
 772         /*
 773          * Search the list for a pool with a matching id
 774          * and register the transport handle with that pool.
 775          */
 776         mutex_enter(&svc->svc_plock);
 777 
 778         if ((pool = svc_pool_find(svc, id)) == NULL) {
 779                 mutex_exit(&svc->svc_plock);
 780                 return (ENOENT);
 781         }
 782 
 783         /* Grab the transport list lock before releasing the pool list lock */
 784         rw_enter(&pool->p_lrwlock, RW_WRITER);
 785         mutex_exit(&svc->svc_plock);
 786 
 787         /* Don't register new transports when the pool is in closing state */
 788         if (pool->p_closing) {
 789                 rw_exit(&pool->p_lrwlock);
 790                 return (EBUSY);
 791         }
 792 
 793         /*
 794          * Initialize xp_pool to point to the pool.
 795          * We don't want to go through the pool list every time.
 796          */
 797         xprt->xp_pool = pool;
 798 
 799         /*
 800          * Insert a transport handle into the list.
 801          * The list head points to the most recently inserted transport.
 802          */
 803         if (pool->p_lhead == NULL)
 804                 pool->p_lhead = xprt->xp_prev = xprt->xp_next = xprt;
 805         else {
 806                 next = pool->p_lhead;
 807                 prev = pool->p_lhead->xp_prev;
 808 
 809                 xprt->xp_next = next;
 810                 xprt->xp_prev = prev;
 811 
 812                 pool->p_lhead = prev->xp_next = next->xp_prev = xprt;
 813         }
 814 
 815         /* Increment the transports count */
 816         pool->p_lcount++;
 817 
 818         rw_exit(&pool->p_lrwlock);
 819         return (0);
 820 }
 821 
 822 /*
 823  * Called from svc_xprt_cleanup() to remove a master transport handle
 824  * from the pool's list of server transports (when a transport is
 825  * being destroyed).
 826  */
 827 void
 828 svc_xprt_unregister(SVCMASTERXPRT *xprt)
 829 {
 830         SVCPOOL *pool = xprt->xp_pool;
 831 
 832         /*
 833          * Unlink xprt from the list.
 834          * If the list head points to this xprt then move it
 835          * to the next xprt or reset to NULL if this is the last
 836          * xprt in the list.
 837          */
 838         rw_enter(&pool->p_lrwlock, RW_WRITER);
 839 
 840         if (xprt == xprt->xp_next)
 841                 pool->p_lhead = NULL;
 842         else {
 843                 SVCMASTERXPRT *next = xprt->xp_next;
 844                 SVCMASTERXPRT *prev = xprt->xp_prev;
 845 
 846                 next->xp_prev = prev;
 847                 prev->xp_next = next;
 848 
 849                 if (pool->p_lhead == xprt)
 850                         pool->p_lhead = next;
 851         }
 852 
 853         xprt->xp_next = xprt->xp_prev = NULL;
 854 
 855         /* Decrement list count */
 856         pool->p_lcount--;
 857 
 858         rw_exit(&pool->p_lrwlock);
 859 }
 860 
 861 static void
 862 svc_xprt_qdestroy(SVCPOOL *pool)
 863 {
 864         mutex_destroy(&pool->p_qend_lock);
 865         kmem_free(pool->p_qbody, pool->p_qsize * sizeof (__SVCXPRT_QNODE));
 866 }
 867 
 868 /*
 869  * Initialize an `xprt-ready' queue for a given pool.
 870  */
 871 static void
 872 svc_xprt_qinit(SVCPOOL *pool, size_t qsize)
 873 {
 874         int i;
 875 
 876         pool->p_qsize = qsize;
 877         pool->p_qbody = kmem_zalloc(pool->p_qsize * sizeof (__SVCXPRT_QNODE),
 878             KM_SLEEP);
 879 
 880         for (i = 0; i < pool->p_qsize - 1; i++)
 881                 pool->p_qbody[i].q_next = &(pool->p_qbody[i+1]);
 882 
 883         pool->p_qbody[pool->p_qsize-1].q_next = &(pool->p_qbody[0]);
 884         pool->p_qtop = &(pool->p_qbody[0]);
 885         pool->p_qend = &(pool->p_qbody[0]);
 886 
 887         mutex_init(&pool->p_qend_lock, NULL, MUTEX_DEFAULT, NULL);
 888 }
 889 
 890 /*
 891  * Called from the svc_queuereq() interrupt routine to queue
 892  * a hint for svc_poll() which transport has a pending request.
 893  * - insert a pointer to xprt into the xprt-ready queue (FIFO)
 894  * - if the xprt-ready queue is full turn the overflow flag on.
 895  *
 896  * NOTICE: pool->p_qtop is protected by the pool's request lock
 897  * and the caller (svc_queuereq()) must hold the lock.
 898  */
 899 static void
 900 svc_xprt_qput(SVCPOOL *pool, SVCMASTERXPRT *xprt)
 901 {
 902         ASSERT(MUTEX_HELD(&pool->p_req_lock));
 903 
 904         /* If the overflow flag is on there is nothing we can do */
 905         if (pool->p_qoverflow)
 906                 return;
 907 
 908         /* If the queue is full turn the overflow flag on and exit */
 909         if (pool->p_qtop->q_next == pool->p_qend) {
 910                 mutex_enter(&pool->p_qend_lock);
 911                 if (pool->p_qtop->q_next == pool->p_qend) {
 912                         pool->p_qoverflow = TRUE;
 913                         mutex_exit(&pool->p_qend_lock);
 914                         return;
 915                 }
 916                 mutex_exit(&pool->p_qend_lock);
 917         }
 918 
 919         /* Insert a hint and move pool->p_qtop */
 920         pool->p_qtop->q_xprt = xprt;
 921         pool->p_qtop = pool->p_qtop->q_next;
 922 }
 923 
 924 /*
 925  * Called from svc_poll() to get a hint which transport has a
 926  * pending request. Returns a pointer to a transport or NULL if the
 927  * `xprt-ready' queue is empty.
 928  *
 929  * Since we do not acquire the pool's request lock while checking if
 930  * the queue is empty we may miss a request that is just being delivered.
 931  * However this is ok since svc_poll() will retry again until the
 932  * count indicates that there are pending requests for this pool.
 933  */
 934 static SVCMASTERXPRT *
 935 svc_xprt_qget(SVCPOOL *pool)
 936 {
 937         SVCMASTERXPRT *xprt;
 938 
 939         mutex_enter(&pool->p_qend_lock);
 940         do {
 941                 /*
 942                  * If the queue is empty return NULL.
 943                  * Since we do not acquire the pool's request lock which
 944                  * protects pool->p_qtop this is not exact check. However,
 945                  * this is safe - if we miss a request here svc_poll()
 946                  * will retry again.
 947                  */
 948                 if (pool->p_qend == pool->p_qtop) {
 949                         mutex_exit(&pool->p_qend_lock);
 950                         return (NULL);
 951                 }
 952 
 953                 /* Get a hint and move pool->p_qend */
 954                 xprt = pool->p_qend->q_xprt;
 955                 pool->p_qend = pool->p_qend->q_next;
 956 
 957                 /* Skip fields deleted by svc_xprt_qdelete()     */
 958         } while (xprt == NULL);
 959         mutex_exit(&pool->p_qend_lock);
 960 
 961         return (xprt);
 962 }
 963 
 964 /*
 965  * Delete all the references to a transport handle that
 966  * is being destroyed from the xprt-ready queue.
 967  * Deleted pointers are replaced with NULLs.
 968  */
 969 static void
 970 svc_xprt_qdelete(SVCPOOL *pool, SVCMASTERXPRT *xprt)
 971 {
 972         __SVCXPRT_QNODE *q;
 973 
 974         mutex_enter(&pool->p_req_lock);
 975         for (q = pool->p_qend; q != pool->p_qtop; q = q->q_next) {
 976                 if (q->q_xprt == xprt)
 977                         q->q_xprt = NULL;
 978         }
 979         mutex_exit(&pool->p_req_lock);
 980 }
 981 
 982 /*
 983  * Destructor for a master server transport handle.
 984  * - if there are no more non-detached threads linked to this transport
 985  *   then, if requested, call xp_closeproc (we don't wait for detached
 986  *   threads linked to this transport to complete).
 987  * - if there are no more threads linked to this
 988  *   transport then
 989  *   a) remove references to this transport from the xprt-ready queue
 990  *   b) remove a reference to this transport from the pool's transport list
 991  *   c) call a transport specific `destroy' function
 992  *   d) cancel remaining thread reservations.
 993  *
 994  * NOTICE: Caller must hold the transport's thread lock.
 995  */
 996 static void
 997 svc_xprt_cleanup(SVCMASTERXPRT *xprt, bool_t detached)
 998 {
 999         ASSERT(MUTEX_HELD(&xprt->xp_thread_lock));
1000         ASSERT(xprt->xp_wq == NULL);
1001 
1002         /*
1003          * If called from the last non-detached thread
1004          * it should call the closeproc on this transport.
1005          */
1006         if (!detached && xprt->xp_threads == 0 && xprt->xp_closeproc) {
1007                 (*(xprt->xp_closeproc)) (xprt);
1008         }
1009 
1010         if (xprt->xp_threads + xprt->xp_detached_threads > 0)
1011                 mutex_exit(&xprt->xp_thread_lock);
1012         else {
1013                 /* Remove references to xprt from the `xprt-ready' queue */
1014                 svc_xprt_qdelete(xprt->xp_pool, xprt);
1015 
1016                 /* Unregister xprt from the pool's transport list */
1017                 svc_xprt_unregister(xprt);
1018                 svc_callout_free(xprt);
1019                 SVC_DESTROY(xprt);
1020         }
1021 }
1022 
1023 /*
1024  * Find a dispatch routine for a given prog/vers pair.
1025  * This function is called from svc_getreq() to search the callout
1026  * table for an entry with a matching RPC program number `prog'
1027  * and a version range that covers `vers'.
1028  * - if it finds a matching entry it returns pointer to the dispatch routine
1029  * - otherwise it returns NULL and, if `minp' or `maxp' are not NULL,
1030  *   fills them with, respectively, lowest version and highest version
1031  *   supported for the program `prog'
1032  */
1033 static SVC_DISPATCH *
1034 svc_callout_find(SVCXPRT *xprt, rpcprog_t prog, rpcvers_t vers,
1035     rpcvers_t *vers_min, rpcvers_t *vers_max)
1036 {
1037         SVC_CALLOUT_TABLE *sct = xprt->xp_sct;
1038         int i;
1039 
1040         *vers_min = ~(rpcvers_t)0;
1041         *vers_max = 0;
1042 
1043         for (i = 0; i < sct->sct_size; i++) {
1044                 SVC_CALLOUT *sc = &sct->sct_sc[i];
1045 
1046                 if (prog == sc->sc_prog) {
1047                         if (vers >= sc->sc_versmin && vers <= sc->sc_versmax)
1048                                 return (sc->sc_dispatch);
1049 
1050                         if (*vers_max < sc->sc_versmax)
1051                                 *vers_max = sc->sc_versmax;
1052                         if (*vers_min > sc->sc_versmin)
1053                                 *vers_min = sc->sc_versmin;
1054                 }
1055         }
1056 
1057         return (NULL);
1058 }
1059 
1060 /*
1061  * Optionally free callout table allocated for this transport by
1062  * the service provider.
1063  */
1064 static void
1065 svc_callout_free(SVCMASTERXPRT *xprt)
1066 {
1067         SVC_CALLOUT_TABLE *sct = xprt->xp_sct;
1068 
1069         if (sct->sct_free) {
1070                 kmem_free(sct->sct_sc, sct->sct_size * sizeof (SVC_CALLOUT));
1071                 kmem_free(sct, sizeof (SVC_CALLOUT_TABLE));
1072         }
1073 }
1074 
1075 /*
1076  * Send a reply to an RPC request
1077  *
1078  * PSARC 2003/523 Contract Private Interface
1079  * svc_sendreply
1080  * Changes must be reviewed by Solaris File Sharing
1081  * Changes must be communicated to contract-2003-523@sun.com
1082  */
1083 bool_t
1084 svc_sendreply(const SVCXPRT *clone_xprt, const xdrproc_t xdr_results,
1085     const caddr_t xdr_location)
1086 {
1087         struct rpc_msg rply;
1088 
1089         rply.rm_direction = REPLY;
1090         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1091         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1092         rply.acpted_rply.ar_stat = SUCCESS;
1093         rply.acpted_rply.ar_results.where = xdr_location;
1094         rply.acpted_rply.ar_results.proc = xdr_results;
1095 
1096         return (SVC_REPLY((SVCXPRT *)clone_xprt, &rply));
1097 }
1098 
1099 /*
1100  * No procedure error reply
1101  *
1102  * PSARC 2003/523 Contract Private Interface
1103  * svcerr_noproc
1104  * Changes must be reviewed by Solaris File Sharing
1105  * Changes must be communicated to contract-2003-523@sun.com
1106  */
1107 void
1108 svcerr_noproc(const SVCXPRT *clone_xprt)
1109 {
1110         struct rpc_msg rply;
1111 
1112         rply.rm_direction = REPLY;
1113         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1114         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1115         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
1116         SVC_FREERES((SVCXPRT *)clone_xprt);
1117         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1118 }
1119 
1120 /*
1121  * Can't decode arguments error reply
1122  *
1123  * PSARC 2003/523 Contract Private Interface
1124  * svcerr_decode
1125  * Changes must be reviewed by Solaris File Sharing
1126  * Changes must be communicated to contract-2003-523@sun.com
1127  */
1128 void
1129 svcerr_decode(const SVCXPRT *clone_xprt)
1130 {
1131         struct rpc_msg rply;
1132 
1133         rply.rm_direction = REPLY;
1134         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1135         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1136         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
1137         SVC_FREERES((SVCXPRT *)clone_xprt);
1138         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1139 }
1140 
1141 /*
1142  * Some system error
1143  */
1144 void
1145 svcerr_systemerr(const SVCXPRT *clone_xprt)
1146 {
1147         struct rpc_msg rply;
1148 
1149         rply.rm_direction = REPLY;
1150         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1151         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1152         rply.acpted_rply.ar_stat = SYSTEM_ERR;
1153         SVC_FREERES((SVCXPRT *)clone_xprt);
1154         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1155 }
1156 
1157 /*
1158  * Authentication error reply
1159  */
1160 void
1161 svcerr_auth(const SVCXPRT *clone_xprt, const enum auth_stat why)
1162 {
1163         struct rpc_msg rply;
1164 
1165         rply.rm_direction = REPLY;
1166         rply.rm_reply.rp_stat = MSG_DENIED;
1167         rply.rjcted_rply.rj_stat = AUTH_ERROR;
1168         rply.rjcted_rply.rj_why = why;
1169         SVC_FREERES((SVCXPRT *)clone_xprt);
1170         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1171 }
1172 
1173 /*
1174  * Authentication too weak error reply
1175  */
1176 void
1177 svcerr_weakauth(const SVCXPRT *clone_xprt)
1178 {
1179         svcerr_auth((SVCXPRT *)clone_xprt, AUTH_TOOWEAK);
1180 }
1181 
1182 /*
1183  * Authentication error; bad credentials
1184  */
1185 void
1186 svcerr_badcred(const SVCXPRT *clone_xprt)
1187 {
1188         struct rpc_msg rply;
1189 
1190         rply.rm_direction = REPLY;
1191         rply.rm_reply.rp_stat = MSG_DENIED;
1192         rply.rjcted_rply.rj_stat = AUTH_ERROR;
1193         rply.rjcted_rply.rj_why = AUTH_BADCRED;
1194         SVC_FREERES((SVCXPRT *)clone_xprt);
1195         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1196 }
1197 
1198 /*
1199  * Program unavailable error reply
1200  *
1201  * PSARC 2003/523 Contract Private Interface
1202  * svcerr_noprog
1203  * Changes must be reviewed by Solaris File Sharing
1204  * Changes must be communicated to contract-2003-523@sun.com
1205  */
1206 void
1207 svcerr_noprog(const SVCXPRT *clone_xprt)
1208 {
1209         struct rpc_msg rply;
1210 
1211         rply.rm_direction = REPLY;
1212         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1213         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1214         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
1215         SVC_FREERES((SVCXPRT *)clone_xprt);
1216         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1217 }
1218 
1219 /*
1220  * Program version mismatch error reply
1221  *
1222  * PSARC 2003/523 Contract Private Interface
1223  * svcerr_progvers
1224  * Changes must be reviewed by Solaris File Sharing
1225  * Changes must be communicated to contract-2003-523@sun.com
1226  */
1227 void
1228 svcerr_progvers(const SVCXPRT *clone_xprt,
1229     const rpcvers_t low_vers, const rpcvers_t high_vers)
1230 {
1231         struct rpc_msg rply;
1232 
1233         rply.rm_direction = REPLY;
1234         rply.rm_reply.rp_stat = MSG_ACCEPTED;
1235         rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
1236         rply.acpted_rply.ar_stat = PROG_MISMATCH;
1237         rply.acpted_rply.ar_vers.low = low_vers;
1238         rply.acpted_rply.ar_vers.high = high_vers;
1239         SVC_FREERES((SVCXPRT *)clone_xprt);
1240         SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
1241 }
1242 
1243 /*
1244  * Get server side input from some transport.
1245  *
1246  * Statement of authentication parameters management:
1247  * This function owns and manages all authentication parameters, specifically
1248  * the "raw" parameters (msg.rm_call.cb_cred and msg.rm_call.cb_verf) and
1249  * the "cooked" credentials (rqst->rq_clntcred).
1250  * However, this function does not know the structure of the cooked
1251  * credentials, so it make the following assumptions:
1252  *   a) the structure is contiguous (no pointers), and
1253  *   b) the cred structure size does not exceed RQCRED_SIZE bytes.
1254  * In all events, all three parameters are freed upon exit from this routine.
1255  * The storage is trivially managed on the call stack in user land, but
1256  * is malloced in kernel land.
1257  *
1258  * Note: the xprt's xp_svc_lock is not held while the service's dispatch
1259  * routine is running.  If we decide to implement svc_unregister(), we'll
1260  * need to decide whether it's okay for a thread to unregister a service
1261  * while a request is being processed.  If we decide that this is a
1262  * problem, we can probably use some sort of reference counting scheme to
1263  * keep the callout entry from going away until the request has completed.
1264  */
1265 static void
1266 svc_getreq(
1267         SVCXPRT *clone_xprt,    /* clone transport handle */
1268         mblk_t *mp)
1269 {
1270         struct rpc_msg msg;
1271         struct svc_req r;
1272         char  *cred_area;       /* too big to allocate on call stack */
1273 
1274         TRACE_0(TR_FAC_KRPC, TR_SVC_GETREQ_START,
1275             "svc_getreq_start:");
1276 
1277         ASSERT(clone_xprt->xp_master != NULL);
1278         ASSERT(!is_system_labeled() || msg_getcred(mp, NULL) != NULL ||
1279             mp->b_datap->db_type != M_DATA);
1280 
1281         /*
1282          * Firstly, allocate the authentication parameters' storage
1283          */
1284         mutex_enter(&rqcred_lock);
1285         if (rqcred_head) {
1286                 cred_area = rqcred_head;
1287 
1288                 /* LINTED pointer alignment */
1289                 rqcred_head = *(caddr_t *)rqcred_head;
1290                 mutex_exit(&rqcred_lock);
1291         } else {
1292                 mutex_exit(&rqcred_lock);
1293                 cred_area = kmem_alloc(2 * MAX_AUTH_BYTES + RQCRED_SIZE,
1294                     KM_SLEEP);
1295         }
1296         msg.rm_call.cb_cred.oa_base = cred_area;
1297         msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
1298         r.rq_clntcred = &(cred_area[2 * MAX_AUTH_BYTES]);
1299 
1300         /*
1301          * underlying transport recv routine may modify mblk data
1302          * and make it difficult to extract label afterwards. So
1303          * get the label from the raw mblk data now.
1304          */
1305         if (is_system_labeled()) {
1306                 cred_t *cr;
1307 
1308                 r.rq_label = kmem_alloc(sizeof (bslabel_t), KM_SLEEP);
1309                 cr = msg_getcred(mp, NULL);
1310                 ASSERT(cr != NULL);
1311 
1312                 bcopy(label2bslabel(crgetlabel(cr)), r.rq_label,
1313                     sizeof (bslabel_t));
1314         } else {
1315                 r.rq_label = NULL;
1316         }
1317 
1318         /*
1319          * Now receive a message from the transport.
1320          */
1321         if (SVC_RECV(clone_xprt, mp, &msg)) {
1322                 void (*dispatchroutine) (struct svc_req *, SVCXPRT *);
1323                 rpcvers_t vers_min;
1324                 rpcvers_t vers_max;
1325                 bool_t no_dispatch;
1326                 enum auth_stat why;
1327 
1328                 /*
1329                  * Find the registered program and call its
1330                  * dispatch routine.
1331                  */
1332                 r.rq_xprt = clone_xprt;
1333                 r.rq_prog = msg.rm_call.cb_prog;
1334                 r.rq_vers = msg.rm_call.cb_vers;
1335                 r.rq_proc = msg.rm_call.cb_proc;
1336                 r.rq_cred = msg.rm_call.cb_cred;
1337 
1338                 /*
1339                  * First authenticate the message.
1340                  */
1341                 TRACE_0(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_START,
1342                     "svc_getreq_auth_start:");
1343                 if ((why = sec_svc_msg(&r, &msg, &no_dispatch)) != AUTH_OK) {
1344                         TRACE_1(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_END,
1345                             "svc_getreq_auth_end:(%S)", "failed");
1346                         svcerr_auth(clone_xprt, why);
1347                         /*
1348                          * Free the arguments.
1349                          */
1350                         (void) SVC_FREEARGS(clone_xprt, NULL, NULL);
1351                 } else if (no_dispatch) {
1352                         /*
1353                          * XXX - when bug id 4053736 is done, remove
1354                          * the SVC_FREEARGS() call.
1355                          */
1356                         (void) SVC_FREEARGS(clone_xprt, NULL, NULL);
1357                 } else {
1358                         TRACE_1(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_END,
1359                             "svc_getreq_auth_end:(%S)", "good");
1360 
1361                         dispatchroutine = svc_callout_find(clone_xprt,
1362                             r.rq_prog, r.rq_vers, &vers_min, &vers_max);
1363 
1364                         if (dispatchroutine) {
1365                                 (*dispatchroutine) (&r, clone_xprt);
1366                         } else {
1367                                 /*
1368                                  * If we got here, the program or version
1369                                  * is not served ...
1370                                  */
1371                                 if (vers_max == 0 ||
1372                                     version_keepquiet(clone_xprt))
1373                                         svcerr_noprog(clone_xprt);
1374                                 else
1375                                         svcerr_progvers(clone_xprt, vers_min,
1376                                             vers_max);
1377 
1378                                 /*
1379                                  * Free the arguments. For successful calls
1380                                  * this is done by the dispatch routine.
1381                                  */
1382                                 (void) SVC_FREEARGS(clone_xprt, NULL, NULL);
1383                                 /* Fall through to ... */
1384                         }
1385                         /*
1386                          * Call cleanup procedure for RPCSEC_GSS.
1387                          * This is a hack since there is currently no
1388                          * op, such as SVC_CLEANAUTH. rpc_gss_cleanup
1389                          * should only be called for a non null proc.
1390                          * Null procs in RPC GSS are overloaded to
1391                          * provide context setup and control. The main
1392                          * purpose of rpc_gss_cleanup is to decrement the
1393                          * reference count associated with the cached
1394                          * GSS security context. We should never get here
1395                          * for an RPCSEC_GSS null proc since *no_dispatch
1396                          * would have been set to true from sec_svc_msg above.
1397                          */
1398                         if (r.rq_cred.oa_flavor == RPCSEC_GSS)
1399                                 rpc_gss_cleanup(clone_xprt);
1400                 }
1401         }
1402 
1403         if (r.rq_label != NULL)
1404                 kmem_free(r.rq_label, sizeof (bslabel_t));
1405 
1406         /*
1407          * Free authentication parameters' storage
1408          */
1409         mutex_enter(&rqcred_lock);
1410         /* LINTED pointer alignment */
1411         *(caddr_t *)cred_area = rqcred_head;
1412         rqcred_head = cred_area;
1413         mutex_exit(&rqcred_lock);
1414 }
1415 
1416 /*
1417  * Allocate new clone transport handle.
1418  */
1419 SVCXPRT *
1420 svc_clone_init(void)
1421 {
1422         SVCXPRT *clone_xprt;
1423 
1424         clone_xprt = kmem_zalloc(sizeof (SVCXPRT), KM_SLEEP);
1425         clone_xprt->xp_cred = crget();
1426         return (clone_xprt);
1427 }
1428 
1429 /*
1430  * Free memory allocated by svc_clone_init.
1431  */
1432 void
1433 svc_clone_free(SVCXPRT *clone_xprt)
1434 {
1435         /* Fre credentials from crget() */
1436         if (clone_xprt->xp_cred)
1437                 crfree(clone_xprt->xp_cred);
1438         kmem_free(clone_xprt, sizeof (SVCXPRT));
1439 }
1440 
1441 /*
1442  * Link a per-thread clone transport handle to a master
1443  * - increment a thread reference count on the master
1444  * - copy some of the master's fields to the clone
1445  * - call a transport specific clone routine.
1446  */
1447 void
1448 svc_clone_link(SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt, SVCXPRT *clone_xprt2)
1449 {
1450         cred_t *cred = clone_xprt->xp_cred;
1451 
1452         ASSERT(cred);
1453 
1454         /*
1455          * Bump up master's thread count.
1456          * Linking a per-thread clone transport handle to a master
1457          * associates a service thread with the master.
1458          */
1459         mutex_enter(&xprt->xp_thread_lock);
1460         xprt->xp_threads++;
1461         mutex_exit(&xprt->xp_thread_lock);
1462 
1463         /* Clear everything */
1464         bzero(clone_xprt, sizeof (SVCXPRT));
1465 
1466         /* Set pointer to the master transport stucture */
1467         clone_xprt->xp_master = xprt;
1468 
1469         /* Structure copy of all the common fields */
1470         clone_xprt->xp_xpc = xprt->xp_xpc;
1471 
1472         /* Restore per-thread fields (xp_cred) */
1473         clone_xprt->xp_cred = cred;
1474 
1475         if (clone_xprt2)
1476                 SVC_CLONE_XPRT(clone_xprt2, clone_xprt);
1477 }
1478 
1479 /*
1480  * Unlink a non-detached clone transport handle from a master
1481  * - decrement a thread reference count on the master
1482  * - if the transport is closing (xp_wq is NULL) call svc_xprt_cleanup();
1483  *   if this is the last non-detached/absolute thread on this transport
1484  *   then it will close/destroy the transport
1485  * - call transport specific function to destroy the clone handle
1486  * - clear xp_master to avoid recursion.
1487  */
1488 void
1489 svc_clone_unlink(SVCXPRT *clone_xprt)
1490 {
1491         SVCMASTERXPRT *xprt = clone_xprt->xp_master;
1492 
1493         /* This cannot be a detached thread */
1494         ASSERT(!clone_xprt->xp_detached);
1495         ASSERT(xprt->xp_threads > 0);
1496 
1497         /* Decrement a reference count on the transport */
1498         mutex_enter(&xprt->xp_thread_lock);
1499         xprt->xp_threads--;
1500 
1501         /* svc_xprt_cleanup() unlocks xp_thread_lock or destroys xprt */
1502         if (xprt->xp_wq)
1503                 mutex_exit(&xprt->xp_thread_lock);
1504         else
1505                 svc_xprt_cleanup(xprt, FALSE);
1506 
1507         /* Call a transport specific clone `destroy' function */
1508         SVC_CLONE_DESTROY(clone_xprt);
1509 
1510         /* Clear xp_master */
1511         clone_xprt->xp_master = NULL;
1512 }
1513 
1514 /*
1515  * Unlink a detached clone transport handle from a master
1516  * - decrement the thread count on the master
1517  * - if the transport is closing (xp_wq is NULL) call svc_xprt_cleanup();
1518  *   if this is the last thread on this transport then it will destroy
1519  *   the transport.
1520  * - call a transport specific function to destroy the clone handle
1521  * - clear xp_master to avoid recursion.
1522  */
1523 static void
1524 svc_clone_unlinkdetached(SVCXPRT *clone_xprt)
1525 {
1526         SVCMASTERXPRT *xprt = clone_xprt->xp_master;
1527 
1528         /* This must be a detached thread */
1529         ASSERT(clone_xprt->xp_detached);
1530         ASSERT(xprt->xp_detached_threads > 0);
1531         ASSERT(xprt->xp_threads + xprt->xp_detached_threads > 0);
1532 
1533         /* Grab xprt->xp_thread_lock and decrement link counts */
1534         mutex_enter(&xprt->xp_thread_lock);
1535         xprt->xp_detached_threads--;
1536 
1537         /* svc_xprt_cleanup() unlocks xp_thread_lock or destroys xprt */
1538         if (xprt->xp_wq)
1539                 mutex_exit(&xprt->xp_thread_lock);
1540         else
1541                 svc_xprt_cleanup(xprt, TRUE);
1542 
1543         /* Call transport specific clone `destroy' function */
1544         SVC_CLONE_DESTROY(clone_xprt);
1545 
1546         /* Clear xp_master */
1547         clone_xprt->xp_master = NULL;
1548 }
1549 
1550 /*
1551  * Try to exit a non-detached service thread
1552  * - check if there are enough threads left
1553  * - if this thread (ie its clone transport handle) are linked
1554  *   to a master transport then unlink it
1555  * - free the clone structure
1556  * - return to userland for thread exit
1557  *
1558  * If this is the last non-detached or the last thread on this
1559  * transport then the call to svc_clone_unlink() will, respectively,
1560  * close and/or destroy the transport.
1561  */
1562 static void
1563 svc_thread_exit(SVCPOOL *pool, SVCXPRT *clone_xprt)
1564 {
1565         if (clone_xprt->xp_master)
1566                 svc_clone_unlink(clone_xprt);
1567         svc_clone_free(clone_xprt);
1568 
1569         mutex_enter(&pool->p_thread_lock);
1570         pool->p_threads--;
1571         if (pool->p_closing && svc_pool_tryexit(pool))
1572                 /* return -  thread exit will be handled at user level */
1573                 return;
1574         mutex_exit(&pool->p_thread_lock);
1575 
1576         /* return -  thread exit will be handled at user level */
1577 }
1578 
1579 /*
1580  * Exit a detached service thread that returned to svc_run
1581  * - decrement the `detached thread' count for the pool
1582  * - unlink the detached clone transport handle from the master
1583  * - free the clone structure
1584  * - return to userland for thread exit
1585  *
1586  * If this is the last thread on this transport then the call
1587  * to svc_clone_unlinkdetached() will destroy the transport.
1588  */
1589 static void
1590 svc_thread_exitdetached(SVCPOOL *pool, SVCXPRT *clone_xprt)
1591 {
1592         /* This must be a detached thread */
1593         ASSERT(clone_xprt->xp_master);
1594         ASSERT(clone_xprt->xp_detached);
1595         ASSERT(!MUTEX_HELD(&pool->p_thread_lock));
1596 
1597         svc_clone_unlinkdetached(clone_xprt);
1598         svc_clone_free(clone_xprt);
1599 
1600         mutex_enter(&pool->p_thread_lock);
1601 
1602         ASSERT(pool->p_reserved_threads >= 0);
1603         ASSERT(pool->p_detached_threads > 0);
1604 
1605         pool->p_detached_threads--;
1606         if (pool->p_closing && svc_pool_tryexit(pool))
1607                 /* return -  thread exit will be handled at user level */
1608                 return;
1609         mutex_exit(&pool->p_thread_lock);
1610 
1611         /* return -  thread exit will be handled at user level */
1612 }
1613 
1614 /*
1615  * PSARC 2003/523 Contract Private Interface
1616  * svc_wait
1617  * Changes must be reviewed by Solaris File Sharing
1618  * Changes must be communicated to contract-2003-523@sun.com
1619  */
1620 int
1621 svc_wait(int id)
1622 {
1623         SVCPOOL *pool;
1624         int     err = 0;
1625         struct svc_globals *svc;
1626 
1627         svc = zone_getspecific(svc_zone_key, curproc->p_zone);
1628         mutex_enter(&svc->svc_plock);
1629         pool = svc_pool_find(svc, id);
1630         mutex_exit(&svc->svc_plock);
1631 
1632         if (pool == NULL)
1633                 return (ENOENT);
1634 
1635         mutex_enter(&pool->p_user_lock);
1636 
1637         /* Check if there's already a user thread waiting on this pool */
1638         if (pool->p_user_waiting) {
1639                 mutex_exit(&pool->p_user_lock);
1640                 return (EBUSY);
1641         }
1642 
1643         pool->p_user_waiting = TRUE;
1644 
1645         /* Go to sleep, waiting for the signaled flag. */
1646         while (!pool->p_signal_create_thread && !pool->p_user_exit) {
1647                 if (cv_wait_sig(&pool->p_user_cv, &pool->p_user_lock) == 0) {
1648                         /* Interrupted, return to handle exit or signal */
1649                         pool->p_user_waiting = FALSE;
1650                         pool->p_signal_create_thread = FALSE;
1651                         mutex_exit(&pool->p_user_lock);
1652 
1653                         /*
1654                          * Thread has been interrupted and therefore
1655                          * the service daemon is leaving as well so
1656                          * let's go ahead and remove the service
1657                          * pool at this time.
1658                          */
1659                         mutex_enter(&svc->svc_plock);
1660                         svc_pool_unregister(svc, pool);
1661                         mutex_exit(&svc->svc_plock);
1662 
1663                         return (EINTR);
1664                 }
1665         }
1666 
1667         pool->p_signal_create_thread = FALSE;
1668         pool->p_user_waiting = FALSE;
1669 
1670         /*
1671          * About to exit the service pool. Set return value
1672          * to let the userland code know our intent. Signal
1673          * svc_thread_creator() so that it can clean up the
1674          * pool structure.
1675          */
1676         if (pool->p_user_exit) {
1677                 err = ECANCELED;
1678                 cv_signal(&pool->p_user_cv);
1679         }
1680 
1681         mutex_exit(&pool->p_user_lock);
1682 
1683         /* Return to userland with error code, for possible thread creation. */
1684         return (err);
1685 }
1686 
1687 /*
1688  * `Service threads' creator thread.
1689  * The creator thread waits for a signal to create new thread.
1690  */
1691 static void
1692 svc_thread_creator(SVCPOOL *pool)
1693 {
1694         callb_cpr_t cpr_info;   /* CPR info for the creator thread */
1695 
1696         CALLB_CPR_INIT(&cpr_info, &pool->p_creator_lock, callb_generic_cpr,
1697             "svc_thread_creator");
1698 
1699         for (;;) {
1700                 mutex_enter(&pool->p_creator_lock);
1701 
1702                 /* Check if someone set the exit flag */
1703                 if (pool->p_creator_exit)
1704                         break;
1705 
1706                 /* Clear the `signaled' flag and go asleep */
1707                 pool->p_creator_signaled = FALSE;
1708 
1709                 CALLB_CPR_SAFE_BEGIN(&cpr_info);
1710                 cv_wait(&pool->p_creator_cv, &pool->p_creator_lock);
1711                 CALLB_CPR_SAFE_END(&cpr_info, &pool->p_creator_lock);
1712 
1713                 /* Check if someone signaled to exit */
1714                 if (pool->p_creator_exit)
1715                         break;
1716 
1717                 mutex_exit(&pool->p_creator_lock);
1718 
1719                 mutex_enter(&pool->p_thread_lock);
1720 
1721                 /*
1722                  * When the pool is in closing state and all the transports
1723                  * are gone the creator should not create any new threads.
1724                  */
1725                 if (pool->p_closing) {
1726                         rw_enter(&pool->p_lrwlock, RW_READER);
1727                         if (pool->p_lcount == 0) {
1728                                 rw_exit(&pool->p_lrwlock);
1729                                 mutex_exit(&pool->p_thread_lock);
1730                                 continue;
1731                         }
1732                         rw_exit(&pool->p_lrwlock);
1733                 }
1734 
1735                 /*
1736                  * Create a new service thread now.
1737                  */
1738                 ASSERT(pool->p_reserved_threads >= 0);
1739                 ASSERT(pool->p_detached_threads >= 0);
1740 
1741                 if (pool->p_threads + pool->p_detached_threads <
1742                     pool->p_maxthreads) {
1743                         /*
1744                          * Signal the service pool wait thread
1745                          * only if it hasn't already been signaled.
1746                          */
1747                         mutex_enter(&pool->p_user_lock);
1748                         if (pool->p_signal_create_thread == FALSE) {
1749                                 pool->p_signal_create_thread = TRUE;
1750                                 cv_signal(&pool->p_user_cv);
1751                         }
1752                         mutex_exit(&pool->p_user_lock);
1753 
1754                 }
1755 
1756                 mutex_exit(&pool->p_thread_lock);
1757         }
1758 
1759         /*
1760          * Pool is closed. Cleanup and exit.
1761          */
1762 
1763         /* Signal userland creator thread that it can stop now. */
1764         mutex_enter(&pool->p_user_lock);
1765         pool->p_user_exit = TRUE;
1766         cv_broadcast(&pool->p_user_cv);
1767         mutex_exit(&pool->p_user_lock);
1768 
1769         /* Wait for svc_wait() to be done with the pool */
1770         mutex_enter(&pool->p_user_lock);
1771         while (pool->p_user_waiting) {
1772                 CALLB_CPR_SAFE_BEGIN(&cpr_info);
1773                 cv_wait(&pool->p_user_cv, &pool->p_user_lock);
1774                 CALLB_CPR_SAFE_END(&cpr_info, &pool->p_creator_lock);
1775         }
1776         mutex_exit(&pool->p_user_lock);
1777 
1778         CALLB_CPR_EXIT(&cpr_info);
1779         svc_pool_cleanup(pool);
1780         zthread_exit();
1781 }
1782 
1783 /*
1784  * If the creator thread  is idle signal it to create
1785  * a new service thread.
1786  */
1787 static void
1788 svc_creator_signal(SVCPOOL *pool)
1789 {
1790         mutex_enter(&pool->p_creator_lock);
1791         if (pool->p_creator_signaled == FALSE) {
1792                 pool->p_creator_signaled = TRUE;
1793                 cv_signal(&pool->p_creator_cv);
1794         }
1795         mutex_exit(&pool->p_creator_lock);
1796 }
1797 
1798 /*
1799  * Notify the creator thread to clean up and exit.
1800  */
1801 static void
1802 svc_creator_signalexit(SVCPOOL *pool)
1803 {
1804         mutex_enter(&pool->p_creator_lock);
1805         pool->p_creator_exit = TRUE;
1806         cv_signal(&pool->p_creator_cv);
1807         mutex_exit(&pool->p_creator_lock);
1808 }
1809 
1810 /*
1811  * Polling part of the svc_run().
1812  * - search for a transport with a pending request
1813  * - when one is found then latch the request lock and return to svc_run()
1814  * - if there is no request go asleep and wait for a signal
1815  * - handle two exceptions:
1816  *   a) current transport is closing
1817  *   b) timeout waiting for a new request
1818  *   in both cases return to svc_run()
1819  */
1820 static SVCMASTERXPRT *
1821 svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
1822 {
1823         /*
1824          * Main loop iterates until
1825          * a) we find a pending request,
1826          * b) detect that the current transport is closing
1827          * c) time out waiting for a new request.
1828          */
1829         for (;;) {
1830                 SVCMASTERXPRT *next;
1831                 clock_t timeleft;
1832 
1833                 /*
1834                  * Step 1.
1835                  * Check if there is a pending request on the current
1836                  * transport handle so that we can avoid cloning.
1837                  * If so then decrement the `pending-request' count for
1838                  * the pool and return to svc_run().
1839                  *
1840                  * We need to prevent a potential starvation. When
1841                  * a selected transport has all pending requests coming in
1842                  * all the time then the service threads will never switch to
1843                  * another transport. With a limited number of service
1844                  * threads some transports may be never serviced.
1845                  * To prevent such a scenario we pick up at most
1846                  * pool->p_max_same_xprt requests from the same transport
1847                  * and then take a hint from the xprt-ready queue or walk
1848                  * the transport list.
1849                  */
1850                 if (xprt && xprt->xp_req_head && (!pool->p_qoverflow ||
1851                     clone_xprt->xp_same_xprt++ < pool->p_max_same_xprt)) {
1852                         mutex_enter(&xprt->xp_req_lock);
1853                         if (xprt->xp_req_head)
1854                                 return (xprt);
1855                         mutex_exit(&xprt->xp_req_lock);
1856                 }
1857                 clone_xprt->xp_same_xprt = 0;
1858 
1859                 /*
1860                  * Step 2.
1861                  * If there is no request on the current transport try to
1862                  * find another transport with a pending request.
1863                  */
1864                 mutex_enter(&pool->p_req_lock);
1865                 pool->p_walkers++;
1866                 mutex_exit(&pool->p_req_lock);
1867 
1868                 /*
1869                  * Make sure that transports will not be destroyed just
1870                  * while we are checking them.
1871                  */
1872                 rw_enter(&pool->p_lrwlock, RW_READER);
1873 
1874                 for (;;) {
1875                         SVCMASTERXPRT *hint;
1876 
1877                         /*
1878                          * Get the next transport from the xprt-ready queue.
1879                          * This is a hint. There is no guarantee that the
1880                          * transport still has a pending request since it
1881                          * could be picked up by another thread in step 1.
1882                          *
1883                          * If the transport has a pending request then keep
1884                          * it locked. Decrement the `pending-requests' for
1885                          * the pool and `walking-threads' counts, and return
1886                          * to svc_run().
1887                          */
1888                         hint = svc_xprt_qget(pool);
1889 
1890                         if (hint && hint->xp_req_head) {
1891                                 mutex_enter(&hint->xp_req_lock);
1892                                 if (hint->xp_req_head) {
1893                                         rw_exit(&pool->p_lrwlock);
1894 
1895                                         mutex_enter(&pool->p_req_lock);
1896                                         pool->p_walkers--;
1897                                         mutex_exit(&pool->p_req_lock);
1898 
1899                                         return (hint);
1900                                 }
1901                                 mutex_exit(&hint->xp_req_lock);
1902                         }
1903 
1904                         /*
1905                          * If there was no hint in the xprt-ready queue then
1906                          * - if there is less pending requests than polling
1907                          *   threads go asleep
1908                          * - otherwise check if there was an overflow in the
1909                          *   xprt-ready queue; if so, then we need to break
1910                          *   the `drain' mode
1911                          */
1912                         if (hint == NULL) {
1913                                 if (pool->p_reqs < pool->p_walkers) {
1914                                         mutex_enter(&pool->p_req_lock);
1915                                         if (pool->p_reqs < pool->p_walkers)
1916                                                 goto sleep;
1917                                         mutex_exit(&pool->p_req_lock);
1918                                 }
1919                                 if (pool->p_qoverflow) {
1920                                         break;
1921                                 }
1922                         }
1923                 }
1924 
1925                 /*
1926                  * If there was an overflow in the xprt-ready queue then we
1927                  * need to switch to the `drain' mode, i.e. walk through the
1928                  * pool's transport list and search for a transport with a
1929                  * pending request. If we manage to drain all the pending
1930                  * requests then we can clear the overflow flag. This will
1931                  * switch svc_poll() back to taking hints from the xprt-ready
1932                  * queue (which is generally more efficient).
1933                  *
1934                  * If there are no registered transports simply go asleep.
1935                  */
1936                 if (xprt == NULL && pool->p_lhead == NULL) {
1937                         mutex_enter(&pool->p_req_lock);
1938                         goto sleep;
1939                 }
1940 
1941                 /*
1942                  * `Walk' through the pool's list of master server
1943                  * transport handles. Continue to loop until there are less
1944                  * looping threads then pending requests.
1945                  */
1946                 next = xprt ? xprt->xp_next : pool->p_lhead;
1947 
1948                 for (;;) {
1949                         /*
1950                          * Check if there is a request on this transport.
1951                          *
1952                          * Since blocking on a locked mutex is very expensive
1953                          * check for a request without a lock first. If we miss
1954                          * a request that is just being delivered but this will
1955                          * cost at most one full walk through the list.
1956                          */
1957                         if (next->xp_req_head) {
1958                                 /*
1959                                  * Check again, now with a lock.
1960                                  */
1961                                 mutex_enter(&next->xp_req_lock);
1962                                 if (next->xp_req_head) {
1963                                         rw_exit(&pool->p_lrwlock);
1964 
1965                                         mutex_enter(&pool->p_req_lock);
1966                                         pool->p_walkers--;
1967                                         mutex_exit(&pool->p_req_lock);
1968 
1969                                         return (next);
1970                                 }
1971                                 mutex_exit(&next->xp_req_lock);
1972                         }
1973 
1974                         /*
1975                          * Continue to `walk' through the pool's
1976                          * transport list until there is less requests
1977                          * than walkers. Check this condition without
1978                          * a lock first to avoid contention on a mutex.
1979                          */
1980                         if (pool->p_reqs < pool->p_walkers) {
1981                                 /* Check again, now with the lock. */
1982                                 mutex_enter(&pool->p_req_lock);
1983                                 if (pool->p_reqs < pool->p_walkers)
1984                                         break;  /* goto sleep */
1985                                 mutex_exit(&pool->p_req_lock);
1986                         }
1987 
1988                         next = next->xp_next;
1989                 }
1990 
1991         sleep:
1992                 /*
1993                  * No work to do. Stop the `walk' and go asleep.
1994                  * Decrement the `walking-threads' count for the pool.
1995                  */
1996                 pool->p_walkers--;
1997                 rw_exit(&pool->p_lrwlock);
1998 
1999                 /*
2000                  * Count us as asleep, mark this thread as safe
2001                  * for suspend and wait for a request.
2002                  */
2003                 pool->p_asleep++;
2004                 timeleft = cv_reltimedwait_sig(&pool->p_req_cv,
2005                     &pool->p_req_lock, pool->p_timeout, TR_CLOCK_TICK);
2006 
2007                 /*
2008                  * If the drowsy flag is on this means that
2009                  * someone has signaled a wakeup. In such a case
2010                  * the `asleep-threads' count has already updated
2011                  * so just clear the flag.
2012                  *
2013                  * If the drowsy flag is off then we need to update
2014                  * the `asleep-threads' count.
2015                  */
2016                 if (pool->p_drowsy) {
2017                         pool->p_drowsy = FALSE;
2018                         /*
2019                          * If the thread is here because it timedout,
2020                          * instead of returning SVC_ETIMEDOUT, it is
2021                          * time to do some more work.
2022                          */
2023                         if (timeleft == -1)
2024                                 timeleft = 1;
2025                 } else {
2026                         pool->p_asleep--;
2027                 }
2028                 mutex_exit(&pool->p_req_lock);
2029 
2030                 /*
2031                  * If we received a signal while waiting for a
2032                  * request, inform svc_run(), so that we can return
2033                  * to user level and exit.
2034                  */
2035                 if (timeleft == 0)
2036                         return (SVC_EINTR);
2037 
2038                 /*
2039                  * If the current transport is gone then notify
2040                  * svc_run() to unlink from it.
2041                  */
2042                 if (xprt && xprt->xp_wq == NULL)
2043                         return (SVC_EXPRTGONE);
2044 
2045                 /*
2046                  * If we have timed out waiting for a request inform
2047                  * svc_run() that we probably don't need this thread.
2048                  */
2049                 if (timeleft == -1)
2050                         return (SVC_ETIMEDOUT);
2051         }
2052 }
2053 
2054 /*
2055  * calculate memory space used by message
2056  */
2057 static size_t
2058 svc_msgsize(mblk_t *mp)
2059 {
2060         size_t count = 0;
2061 
2062         for (; mp; mp = mp->b_cont)
2063                 count += MBLKSIZE(mp);
2064 
2065         return (count);
2066 }
2067 
2068 /*
2069  * svc_flowcontrol() attempts to turn the flow control on or off for the
2070  * transport.
2071  *
2072  * On input the xprt->xp_full determines whether the flow control is currently
2073  * off (FALSE) or on (TRUE).  If it is off we do tests to see whether we should
2074  * turn it on, and vice versa.
2075  *
2076  * There are two conditions considered for the flow control.  Both conditions
2077  * have the low and the high watermark.  Once the high watermark is reached in
2078  * EITHER condition the flow control is turned on.  For turning the flow
2079  * control off BOTH conditions must be below the low watermark.
2080  *
2081  * Condition #1 - Number of requests queued:
2082  *
2083  * The max number of threads working on the pool is roughly pool->p_maxthreads.
2084  * Every thread could handle up to pool->p_max_same_xprt requests from one
2085  * transport before it moves to another transport.  See svc_poll() for details.
2086  * In case all threads in the pool are working on a transport they will handle
2087  * no more than enough_reqs (pool->p_maxthreads * pool->p_max_same_xprt)
2088  * requests in one shot from that transport.  We are turning the flow control
2089  * on once the high watermark is reached for a transport so that the underlying
2090  * queue knows the rate of incoming requests is higher than we are able to
2091  * handle.
2092  *
2093  * The high watermark: 2 * enough_reqs
2094  * The low watermark: enough_reqs
2095  *
2096  * Condition #2 - Length of the data payload for the queued messages/requests:
2097  *
2098  * We want to prevent a particular pool exhausting the memory, so once the
2099  * total length of queued requests for the whole pool reaches the high
2100  * watermark we start to turn on the flow control for significant memory
2101  * consumers (individual transports).  To keep the implementation simple
2102  * enough, this condition is not exact, because we count only the data part of
2103  * the queued requests and we ignore the overhead.  For our purposes this
2104  * should be enough.  We should also consider that up to pool->p_maxthreads
2105  * threads for the pool might work on large requests (this is not counted for
2106  * this condition).  We need to leave some space for rest of the system and for
2107  * other big memory consumers (like ZFS).  Also, after the flow control is
2108  * turned on (on cots transports) we can start to accumulate a few megabytes in
2109  * queues for each transport.
2110  *
2111  * Usually, the big memory consumers are NFS WRITE requests, so we do not
2112  * expect to see this condition met for other than NFS pools.
2113  *
2114  * The high watermark: 1/5 of available memory
2115  * The low watermark: 1/6 of available memory
2116  *
2117  * Once the high watermark is reached we turn the flow control on only for
2118  * transports exceeding a per-transport memory limit.  The per-transport
2119  * fraction of memory is calculated as:
2120  *
2121  * the high watermark / number of transports
2122  *
2123  * For transports with less than the per-transport fraction of memory consumed,
2124  * the flow control is not turned on, so they are not blocked by a few "hungry"
2125  * transports.  Because of this, the total memory consumption for the
2126  * particular pool might grow up to 2 * the high watermark.
2127  *
2128  * The individual transports are unblocked once their consumption is below:
2129  *
2130  * per-transport fraction of memory / 2
2131  *
2132  * or once the total memory consumption for the whole pool falls below the low
2133  * watermark.
2134  *
2135  */
2136 static void
2137 svc_flowcontrol(SVCMASTERXPRT *xprt)
2138 {
2139         SVCPOOL *pool = xprt->xp_pool;
2140         size_t totalmem = ptob(physmem);
2141         int enough_reqs = pool->p_maxthreads * pool->p_max_same_xprt;
2142 
2143         ASSERT(MUTEX_HELD(&xprt->xp_req_lock));
2144 
2145         /* Should we turn the flow control on? */
2146         if (xprt->xp_full == FALSE) {
2147                 /* Is flow control disabled? */
2148                 if (svc_flowcontrol_disable != 0)
2149                         return;
2150 
2151                 /* Is there enough requests queued? */
2152                 if (xprt->xp_reqs >= enough_reqs * 2) {
2153                         xprt->xp_full = TRUE;
2154                         return;
2155                 }
2156 
2157                 /*
2158                  * If this pool uses over 20% of memory and this transport is
2159                  * significant memory consumer then we are full
2160                  */
2161                 if (pool->p_size >= totalmem / 5 &&
2162                     xprt->xp_size >= totalmem / 5 / pool->p_lcount)
2163                         xprt->xp_full = TRUE;
2164 
2165                 return;
2166         }
2167 
2168         /* We might want to turn the flow control off */
2169 
2170         /* Do we still have enough requests? */
2171         if (xprt->xp_reqs > enough_reqs)
2172                 return;
2173 
2174         /*
2175          * If this pool still uses over 16% of memory and this transport is
2176          * still significant memory consumer then we are still full
2177          */
2178         if (pool->p_size >= totalmem / 6 &&
2179             xprt->xp_size >= totalmem / 5 / pool->p_lcount / 2)
2180                 return;
2181 
2182         /* Turn the flow control off and make sure rpcmod is notified */
2183         xprt->xp_full = FALSE;
2184         xprt->xp_enable = TRUE;
2185 }
2186 
2187 /*
2188  * Main loop of the kernel RPC server
2189  * - wait for input (find a transport with a pending request).
2190  * - dequeue the request
2191  * - call a registered server routine to process the requests
2192  *
2193  * There can many threads running concurrently in this loop
2194  * on the same or on different transports.
2195  */
2196 static int
2197 svc_run(SVCPOOL *pool)
2198 {
2199         SVCMASTERXPRT *xprt = NULL;     /* master transport handle  */
2200         SVCXPRT *clone_xprt;    /* clone for this thread    */
2201         proc_t *p = ttoproc(curthread);
2202 
2203         /* Allocate a clone transport handle for this thread */
2204         clone_xprt = svc_clone_init();
2205 
2206         /*
2207          * The loop iterates until the thread becomes
2208          * idle too long or the transport is gone.
2209          */
2210         for (;;) {
2211                 SVCMASTERXPRT *next;
2212                 mblk_t *mp;
2213                 bool_t enable;
2214                 size_t size;
2215 
2216                 TRACE_0(TR_FAC_KRPC, TR_SVC_RUN, "svc_run");
2217 
2218                 /*
2219                  * If the process is exiting/killed, return
2220                  * immediately without processing any more
2221                  * requests.
2222                  */
2223                 if (p->p_flag & (SEXITING | SKILLED)) {
2224                         svc_thread_exit(pool, clone_xprt);
2225                         return (EINTR);
2226                 }
2227 
2228                 /* Find a transport with a pending request */
2229                 next = svc_poll(pool, xprt, clone_xprt);
2230 
2231                 /*
2232                  * If svc_poll() finds a transport with a request
2233                  * it latches xp_req_lock on it. Therefore we need
2234                  * to dequeue the request and release the lock as
2235                  * soon as possible.
2236                  */
2237                 ASSERT(next != NULL &&
2238                     (next == SVC_EXPRTGONE ||
2239                     next == SVC_ETIMEDOUT ||
2240                     next == SVC_EINTR ||
2241                     MUTEX_HELD(&next->xp_req_lock)));
2242 
2243                 /* Ooops! Current transport is closing. Unlink now */
2244                 if (next == SVC_EXPRTGONE) {
2245                         svc_clone_unlink(clone_xprt);
2246                         xprt = NULL;
2247                         continue;
2248                 }
2249 
2250                 /* Ooops! Timeout while waiting for a request. Exit */
2251                 if (next == SVC_ETIMEDOUT) {
2252                         svc_thread_exit(pool, clone_xprt);
2253                         return (0);
2254                 }
2255 
2256                 /*
2257                  * Interrupted by a signal while waiting for a
2258                  * request. Return to userspace and exit.
2259                  */
2260                 if (next == SVC_EINTR) {
2261                         svc_thread_exit(pool, clone_xprt);
2262                         return (EINTR);
2263                 }
2264 
2265                 /*
2266                  * De-queue the request and release the request lock
2267                  * on this transport (latched by svc_poll()).
2268                  */
2269                 mp = next->xp_req_head;
2270                 next->xp_req_head = mp->b_next;
2271                 mp->b_next = (mblk_t *)0;
2272                 size = svc_msgsize(mp);
2273 
2274                 mutex_enter(&pool->p_req_lock);
2275                 pool->p_reqs--;
2276                 if (pool->p_reqs == 0)
2277                         pool->p_qoverflow = FALSE;
2278                 pool->p_size -= size;
2279                 mutex_exit(&pool->p_req_lock);
2280 
2281                 next->xp_reqs--;
2282                 next->xp_size -= size;
2283 
2284                 if (next->xp_full)
2285                         svc_flowcontrol(next);
2286 
2287                 TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_DEQ,
2288                     "rpc_que_req_deq:pool %p mp %p", pool, mp);
2289                 mutex_exit(&next->xp_req_lock);
2290 
2291                 /*
2292                  * If this is a new request on a current transport then
2293                  * the clone structure is already properly initialized.
2294                  * Otherwise, if the request is on a different transport,
2295                  * unlink from the current master and link to
2296                  * the one we got a request on.
2297                  */
2298                 if (next != xprt) {
2299                         if (xprt)
2300                                 svc_clone_unlink(clone_xprt);
2301                         svc_clone_link(next, clone_xprt, NULL);
2302                         xprt = next;
2303                 }
2304 
2305                 /*
2306                  * If there are more requests and req_cv hasn't
2307                  * been signaled yet then wake up one more thread now.
2308                  *
2309                  * We avoid signaling req_cv until the most recently
2310                  * signaled thread wakes up and gets CPU to clear
2311                  * the `drowsy' flag.
2312                  */
2313                 if (!(pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
2314                     pool->p_asleep == 0)) {
2315                         mutex_enter(&pool->p_req_lock);
2316 
2317                         if (pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
2318                             pool->p_asleep == 0)
2319                                 mutex_exit(&pool->p_req_lock);
2320                         else {
2321                                 pool->p_asleep--;
2322                                 pool->p_drowsy = TRUE;
2323 
2324                                 cv_signal(&pool->p_req_cv);
2325                                 mutex_exit(&pool->p_req_lock);
2326                         }
2327                 }
2328 
2329                 /*
2330                  * If there are no asleep/signaled threads, we are
2331                  * still below pool->p_maxthreads limit, and no thread is
2332                  * currently being created then signal the creator
2333                  * for one more service thread.
2334                  *
2335                  * The asleep and drowsy checks are not protected
2336                  * by a lock since it hurts performance and a wrong
2337                  * decision is not essential.
2338                  */
2339                 if (pool->p_asleep == 0 && !pool->p_drowsy &&
2340                     pool->p_threads + pool->p_detached_threads <
2341                     pool->p_maxthreads)
2342                         svc_creator_signal(pool);
2343 
2344                 /*
2345                  * Process the request.
2346                  */
2347                 svc_getreq(clone_xprt, mp);
2348 
2349                 /* If thread had a reservation it should have been canceled */
2350                 ASSERT(!clone_xprt->xp_reserved);
2351 
2352                 /*
2353                  * If the clone is marked detached then exit.
2354                  * The rpcmod slot has already been released
2355                  * when we detached this thread.
2356                  */
2357                 if (clone_xprt->xp_detached) {
2358                         svc_thread_exitdetached(pool, clone_xprt);
2359                         return (0);
2360                 }
2361 
2362                 /*
2363                  * Release our reference on the rpcmod
2364                  * slot attached to xp_wq->q_ptr.
2365                  */
2366                 mutex_enter(&xprt->xp_req_lock);
2367                 enable = xprt->xp_enable;
2368                 if (enable)
2369                         xprt->xp_enable = FALSE;
2370                 mutex_exit(&xprt->xp_req_lock);
2371                 SVC_RELE(clone_xprt, NULL, enable);
2372         }
2373         /* NOTREACHED */
2374 }
2375 
2376 /*
2377  * Flush any pending requests for the queue and
2378  * free the associated mblks.
2379  */
2380 void
2381 svc_queueclean(queue_t *q)
2382 {
2383         SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
2384         mblk_t *mp;
2385         SVCPOOL *pool;
2386 
2387         /*
2388          * clean up the requests
2389          */
2390         mutex_enter(&xprt->xp_req_lock);
2391         pool = xprt->xp_pool;
2392         while ((mp = xprt->xp_req_head) != NULL) {
2393                 /* remove the request from the list */
2394                 xprt->xp_req_head = mp->b_next;
2395                 mp->b_next = (mblk_t *)0;
2396                 SVC_RELE(xprt, mp, FALSE);
2397         }
2398 
2399         mutex_enter(&pool->p_req_lock);
2400         pool->p_reqs -= xprt->xp_reqs;
2401         pool->p_size -= xprt->xp_size;
2402         mutex_exit(&pool->p_req_lock);
2403 
2404         xprt->xp_reqs = 0;
2405         xprt->xp_size = 0;
2406         xprt->xp_full = FALSE;
2407         xprt->xp_enable = FALSE;
2408         mutex_exit(&xprt->xp_req_lock);
2409 }
2410 
2411 /*
2412  * This routine is called by rpcmod to inform kernel RPC that a
2413  * queue is closing. It is called after all the requests have been
2414  * picked up (that is after all the slots on the queue have
2415  * been released by kernel RPC). It is also guaranteed that no more
2416  * request will be delivered on this transport.
2417  *
2418  * - clear xp_wq to mark the master server transport handle as closing
2419  * - if there are no more threads on this transport close/destroy it
2420  * - otherwise, leave the linked threads to close/destroy the transport
2421  *   later.
2422  */
2423 void
2424 svc_queueclose(queue_t *q)
2425 {
2426         SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
2427 
2428         if (xprt == NULL) {
2429                 /*
2430                  * If there is no master xprt associated with this stream,
2431                  * then there is nothing to do.  This happens regularly
2432                  * with connection-oriented listening streams created by
2433                  * nfsd.
2434                  */
2435                 return;
2436         }
2437 
2438         mutex_enter(&xprt->xp_thread_lock);
2439 
2440         ASSERT(xprt->xp_req_head == NULL);
2441         ASSERT(xprt->xp_wq != NULL);
2442 
2443         xprt->xp_wq = NULL;
2444 
2445         if (xprt->xp_threads == 0) {
2446                 SVCPOOL *pool = xprt->xp_pool;
2447 
2448                 /*
2449                  * svc_xprt_cleanup() destroys the transport
2450                  * or releases the transport thread lock
2451                  */
2452                 svc_xprt_cleanup(xprt, FALSE);
2453 
2454                 mutex_enter(&pool->p_thread_lock);
2455 
2456                 /*
2457                  * If the pool is in closing state and this was
2458                  * the last transport in the pool then signal the creator
2459                  * thread to clean up and exit.
2460                  */
2461                 if (pool->p_closing && svc_pool_tryexit(pool)) {
2462                         return;
2463                 }
2464                 mutex_exit(&pool->p_thread_lock);
2465         } else {
2466                 /*
2467                  * There are still some threads linked to the transport.  They
2468                  * are very likely sleeping in svc_poll().  We could wake up
2469                  * them by broadcasting on the p_req_cv condition variable, but
2470                  * that might give us a performance penalty if there are too
2471                  * many sleeping threads.
2472                  *
2473                  * Instead, we do nothing here.  The linked threads will unlink
2474                  * themselves and destroy the transport once they are woken up
2475                  * on timeout, or by new request.  There is no reason to hurry
2476                  * up now with the thread wake up.
2477                  */
2478 
2479                 /*
2480                  *  NOTICE: No references to the master transport structure
2481                  *          beyond this point!
2482                  */
2483                 mutex_exit(&xprt->xp_thread_lock);
2484         }
2485 }
2486 
2487 /*
2488  * Interrupt `request delivery' routine called from rpcmod
2489  * - put a request at the tail of the transport request queue
2490  * - insert a hint for svc_poll() into the xprt-ready queue
2491  * - increment the `pending-requests' count for the pool
2492  * - handle flow control
2493  * - wake up a thread sleeping in svc_poll() if necessary
2494  * - if all the threads are running ask the creator for a new one.
2495  */
2496 bool_t
2497 svc_queuereq(queue_t *q, mblk_t *mp, bool_t flowcontrol)
2498 {
2499         SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
2500         SVCPOOL *pool = xprt->xp_pool;
2501         size_t size;
2502 
2503         TRACE_0(TR_FAC_KRPC, TR_SVC_QUEUEREQ_START, "svc_queuereq_start");
2504 
2505         ASSERT(!is_system_labeled() || msg_getcred(mp, NULL) != NULL ||
2506             mp->b_datap->db_type != M_DATA);
2507 
2508         /*
2509          * Step 1.
2510          * Grab the transport's request lock and the
2511          * pool's request lock so that when we put
2512          * the request at the tail of the transport's
2513          * request queue, possibly put the request on
2514          * the xprt ready queue and increment the
2515          * pending request count it looks atomic.
2516          */
2517         mutex_enter(&xprt->xp_req_lock);
2518         if (flowcontrol && xprt->xp_full) {
2519                 mutex_exit(&xprt->xp_req_lock);
2520 
2521                 return (FALSE);
2522         }
2523         ASSERT(xprt->xp_full == FALSE);
2524         mutex_enter(&pool->p_req_lock);
2525         if (xprt->xp_req_head == NULL)
2526                 xprt->xp_req_head = mp;
2527         else
2528                 xprt->xp_req_tail->b_next = mp;
2529         xprt->xp_req_tail = mp;
2530 
2531         /*
2532          * Step 2.
2533          * Insert a hint into the xprt-ready queue, increment
2534          * counters, handle flow control, and wake up
2535          * a thread sleeping in svc_poll() if necessary.
2536          */
2537 
2538         /* Insert pointer to this transport into the xprt-ready queue */
2539         svc_xprt_qput(pool, xprt);
2540 
2541         /* Increment counters */
2542         pool->p_reqs++;
2543         xprt->xp_reqs++;
2544 
2545         size = svc_msgsize(mp);
2546         xprt->xp_size += size;
2547         pool->p_size += size;
2548 
2549         /* Handle flow control */
2550         if (flowcontrol)
2551                 svc_flowcontrol(xprt);
2552 
2553         TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_ENQ,
2554             "rpc_que_req_enq:pool %p mp %p", pool, mp);
2555 
2556         /*
2557          * If there are more requests and req_cv hasn't
2558          * been signaled yet then wake up one more thread now.
2559          *
2560          * We avoid signaling req_cv until the most recently
2561          * signaled thread wakes up and gets CPU to clear
2562          * the `drowsy' flag.
2563          */
2564         if (pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
2565             pool->p_asleep == 0) {
2566                 mutex_exit(&pool->p_req_lock);
2567         } else {
2568                 pool->p_drowsy = TRUE;
2569                 pool->p_asleep--;
2570 
2571                 /*
2572                  * Signal wakeup and drop the request lock.
2573                  */
2574                 cv_signal(&pool->p_req_cv);
2575                 mutex_exit(&pool->p_req_lock);
2576         }
2577         mutex_exit(&xprt->xp_req_lock);
2578 
2579         /*
2580          * Step 3.
2581          * If there are no asleep/signaled threads, we are
2582          * still below pool->p_maxthreads limit, and no thread is
2583          * currently being created then signal the creator
2584          * for one more service thread.
2585          *
2586          * The asleep and drowsy checks are not not protected
2587          * by a lock since it hurts performance and a wrong
2588          * decision is not essential.
2589          */
2590         if (pool->p_asleep == 0 && !pool->p_drowsy &&
2591             pool->p_threads + pool->p_detached_threads < pool->p_maxthreads)
2592                 svc_creator_signal(pool);
2593 
2594         TRACE_1(TR_FAC_KRPC, TR_SVC_QUEUEREQ_END,
2595             "svc_queuereq_end:(%S)", "end");
2596 
2597         return (TRUE);
2598 }
2599 
2600 /*
2601  * Reserve a service thread so that it can be detached later.
2602  * This reservation is required to make sure that when it tries to
2603  * detach itself the total number of detached threads does not exceed
2604  * pool->p_maxthreads - pool->p_redline (i.e. that we can have
2605  * up to pool->p_redline non-detached threads).
2606  *
2607  * If the thread does not detach itself later, it should cancel the
2608  * reservation before returning to svc_run().
2609  *
2610  * - check if there is room for more reserved/detached threads
2611  * - if so, then increment the `reserved threads' count for the pool
2612  * - mark the thread as reserved (setting the flag in the clone transport
2613  *   handle for this thread
2614  * - returns 1 if the reservation succeeded, 0 if it failed.
2615  */
2616 int
2617 svc_reserve_thread(SVCXPRT *clone_xprt)
2618 {
2619         SVCPOOL *pool = clone_xprt->xp_master->xp_pool;
2620 
2621         /* Recursive reservations are not allowed */
2622         ASSERT(!clone_xprt->xp_reserved);
2623         ASSERT(!clone_xprt->xp_detached);
2624 
2625         /* Check pool counts if there is room for reservation */
2626         mutex_enter(&pool->p_thread_lock);
2627         if (pool->p_reserved_threads + pool->p_detached_threads >=
2628             pool->p_maxthreads - pool->p_redline) {
2629                 mutex_exit(&pool->p_thread_lock);
2630                 return (0);
2631         }
2632         pool->p_reserved_threads++;
2633         mutex_exit(&pool->p_thread_lock);
2634 
2635         /* Mark the thread (clone handle) as reserved */
2636         clone_xprt->xp_reserved = TRUE;
2637 
2638         return (1);
2639 }
2640 
2641 /*
2642  * Cancel a reservation for a thread.
2643  * - decrement the `reserved threads' count for the pool
2644  * - clear the flag in the clone transport handle for this thread.
2645  */
2646 void
2647 svc_unreserve_thread(SVCXPRT *clone_xprt)
2648 {
2649         SVCPOOL *pool = clone_xprt->xp_master->xp_pool;
2650 
2651         /* Thread must have a reservation */
2652         ASSERT(clone_xprt->xp_reserved);
2653         ASSERT(!clone_xprt->xp_detached);
2654 
2655         /* Decrement global count */
2656         mutex_enter(&pool->p_thread_lock);
2657         pool->p_reserved_threads--;
2658         mutex_exit(&pool->p_thread_lock);
2659 
2660         /* Clear reservation flag */
2661         clone_xprt->xp_reserved = FALSE;
2662 }
2663 
2664 /*
2665  * Detach a thread from its transport, so that it can block for an
2666  * extended time.  Because the transport can be closed after the thread is
2667  * detached, the thread should have already sent off a reply if it was
2668  * going to send one.
2669  *
2670  * - decrement `non-detached threads' count and increment `detached threads'
2671  *   counts for the transport
2672  * - decrement the  `non-detached threads' and `reserved threads'
2673  *   counts and increment the `detached threads' count for the pool
2674  * - release the rpcmod slot
2675  * - mark the clone (thread) as detached.
2676  *
2677  * No need to return a pointer to the thread's CPR information, since
2678  * the thread has a userland identity.
2679  *
2680  * NOTICE: a thread must not detach itself without making a prior reservation
2681  *         through svc_thread_reserve().
2682  */
2683 callb_cpr_t *
2684 svc_detach_thread(SVCXPRT *clone_xprt)
2685 {
2686         SVCMASTERXPRT *xprt = clone_xprt->xp_master;
2687         SVCPOOL *pool = xprt->xp_pool;
2688         bool_t enable;
2689 
2690         /* Thread must have a reservation */
2691         ASSERT(clone_xprt->xp_reserved);
2692         ASSERT(!clone_xprt->xp_detached);
2693 
2694         /* Bookkeeping for this transport */
2695         mutex_enter(&xprt->xp_thread_lock);
2696         xprt->xp_threads--;
2697         xprt->xp_detached_threads++;
2698         mutex_exit(&xprt->xp_thread_lock);
2699 
2700         /* Bookkeeping for the pool */
2701         mutex_enter(&pool->p_thread_lock);
2702         pool->p_threads--;
2703         pool->p_reserved_threads--;
2704         pool->p_detached_threads++;
2705         mutex_exit(&pool->p_thread_lock);
2706 
2707         /* Release an rpcmod slot for this request */
2708         mutex_enter(&xprt->xp_req_lock);
2709         enable = xprt->xp_enable;
2710         if (enable)
2711                 xprt->xp_enable = FALSE;
2712         mutex_exit(&xprt->xp_req_lock);
2713         SVC_RELE(clone_xprt, NULL, enable);
2714 
2715         /* Mark the clone (thread) as detached */
2716         clone_xprt->xp_reserved = FALSE;
2717         clone_xprt->xp_detached = TRUE;
2718 
2719         return (NULL);
2720 }
2721 
2722 /*
2723  * This routine is responsible for extracting RDMA plugin master XPRT,
2724  * unregister from the SVCPOOL and initiate plugin specific cleanup.
2725  * It is passed a list/group of rdma transports as records which are
2726  * active in a given registered or unregistered kRPC thread pool. Its shuts
2727  * all active rdma transports in that pool. If the thread active on the trasport
2728  * happens to be last thread for that pool, it will signal the creater thread
2729  * to cleanup the pool and destroy the xprt in svc_queueclose()
2730  */
2731 void
2732 rdma_stop(rdma_xprt_group_t *rdma_xprts)
2733 {
2734         SVCMASTERXPRT *xprt;
2735         rdma_xprt_record_t *curr_rec;
2736         queue_t *q;
2737         mblk_t *mp;
2738         int i, rtg_count;
2739         SVCPOOL *pool;
2740 
2741         if (rdma_xprts->rtg_count == 0)
2742                 return;
2743 
2744         rtg_count = rdma_xprts->rtg_count;
2745 
2746         for (i = 0; i < rtg_count; i++) {
2747                 curr_rec = rdma_xprts->rtg_listhead;
2748                 rdma_xprts->rtg_listhead = curr_rec->rtr_next;
2749                 rdma_xprts->rtg_count--;
2750                 curr_rec->rtr_next = NULL;
2751                 xprt = curr_rec->rtr_xprt_ptr;
2752                 q = xprt->xp_wq;
2753                 svc_rdma_kstop(xprt);
2754 
2755                 mutex_enter(&xprt->xp_req_lock);
2756                 pool = xprt->xp_pool;
2757                 while ((mp = xprt->xp_req_head) != NULL) {
2758                         rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr;
2759 
2760                         /* remove the request from the list */
2761                         xprt->xp_req_head = mp->b_next;
2762                         mp->b_next = (mblk_t *)0;
2763 
2764                         RDMA_BUF_FREE(rdp->conn, &rdp->rpcmsg);
2765                         RDMA_REL_CONN(rdp->conn);
2766                         freemsg(mp);
2767                 }
2768                 mutex_enter(&pool->p_req_lock);
2769                 pool->p_reqs -= xprt->xp_reqs;
2770                 pool->p_size -= xprt->xp_size;
2771                 mutex_exit(&pool->p_req_lock);
2772                 xprt->xp_reqs = 0;
2773                 xprt->xp_size = 0;
2774                 xprt->xp_full = FALSE;
2775                 xprt->xp_enable = FALSE;
2776                 mutex_exit(&xprt->xp_req_lock);
2777                 svc_queueclose(q);
2778 #ifdef  DEBUG
2779                 if (rdma_check)
2780                         cmn_err(CE_NOTE, "rdma_stop: Exited svc_queueclose\n");
2781 #endif
2782                 /*
2783                  * Free the rdma transport record for the expunged rdma
2784                  * based master transport handle.
2785                  */
2786                 kmem_free(curr_rec, sizeof (rdma_xprt_record_t));
2787                 if (!rdma_xprts->rtg_listhead)
2788                         break;
2789         }
2790 }
2791 
2792 
2793 /*
2794  * rpc_msg_dup/rpc_msg_free
2795  * Currently only used by svc_rpcsec_gss.c but put in this file as it
2796  * may be useful to others in the future.
2797  * But future consumers should be careful cuz so far
2798  *   - only tested/used for call msgs (not reply)
2799  *   - only tested/used with call verf oa_length==0
2800  */
2801 struct rpc_msg *
2802 rpc_msg_dup(struct rpc_msg *src)
2803 {
2804         struct rpc_msg *dst;
2805         struct opaque_auth oa_src, oa_dst;
2806 
2807         dst = kmem_alloc(sizeof (*dst), KM_SLEEP);
2808 
2809         dst->rm_xid = src->rm_xid;
2810         dst->rm_direction = src->rm_direction;
2811 
2812         dst->rm_call.cb_rpcvers = src->rm_call.cb_rpcvers;
2813         dst->rm_call.cb_prog = src->rm_call.cb_prog;
2814         dst->rm_call.cb_vers = src->rm_call.cb_vers;
2815         dst->rm_call.cb_proc = src->rm_call.cb_proc;
2816 
2817         /* dup opaque auth call body cred */
2818         oa_src = src->rm_call.cb_cred;
2819 
2820         oa_dst.oa_flavor = oa_src.oa_flavor;
2821         oa_dst.oa_base = kmem_alloc(oa_src.oa_length, KM_SLEEP);
2822 
2823         bcopy(oa_src.oa_base, oa_dst.oa_base, oa_src.oa_length);
2824         oa_dst.oa_length = oa_src.oa_length;
2825 
2826         dst->rm_call.cb_cred = oa_dst;
2827 
2828         /* dup or just alloc opaque auth call body verifier */
2829         if (src->rm_call.cb_verf.oa_length > 0) {
2830                 oa_src = src->rm_call.cb_verf;
2831 
2832                 oa_dst.oa_flavor = oa_src.oa_flavor;
2833                 oa_dst.oa_base = kmem_alloc(oa_src.oa_length, KM_SLEEP);
2834 
2835                 bcopy(oa_src.oa_base, oa_dst.oa_base, oa_src.oa_length);
2836                 oa_dst.oa_length = oa_src.oa_length;
2837 
2838                 dst->rm_call.cb_verf = oa_dst;
2839         } else {
2840                 oa_dst.oa_flavor = -1;  /* will be set later */
2841                 oa_dst.oa_base = kmem_alloc(MAX_AUTH_BYTES, KM_SLEEP);
2842 
2843                 oa_dst.oa_length = 0;   /* will be set later */
2844 
2845                 dst->rm_call.cb_verf = oa_dst;
2846         }
2847         return (dst);
2848 
2849 error:
2850         kmem_free(dst->rm_call.cb_cred.oa_base,      dst->rm_call.cb_cred.oa_length);
2851         kmem_free(dst, sizeof (*dst));
2852         return (NULL);
2853 }
2854 
2855 void
2856 rpc_msg_free(struct rpc_msg **msg, int cb_verf_oa_length)
2857 {
2858         struct rpc_msg *m = *msg;
2859 
2860         kmem_free(m->rm_call.cb_cred.oa_base, m->rm_call.cb_cred.oa_length);
2861         m->rm_call.cb_cred.oa_base = NULL;
2862         m->rm_call.cb_cred.oa_length = 0;
2863 
2864         kmem_free(m->rm_call.cb_verf.oa_base, cb_verf_oa_length);
2865         m->rm_call.cb_verf.oa_base = NULL;
2866         m->rm_call.cb_verf.oa_length = 0;
2867 
2868         kmem_free(m, sizeof (*m));
2869         m = NULL;
2870 }