1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 /*
  26  * Copyright 2012 Milan Jurik. All rights reserved.
  27  * Copyright 2012 Marcel Telka <marcel@telka.sk>
  28  * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  29  * Copyright 2018 OmniOS Community Edition (OmniOSce) Association.
  30  */
  31 /* Copyright (c) 1990 Mentat Inc. */
  32 
  33 /*      Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T     */
  34 /*      All Rights Reserved     */
  35 
  36 /*
  37  * Kernel RPC filtering module
  38  */
  39 
  40 #include <sys/param.h>
  41 #include <sys/types.h>
  42 #include <sys/stream.h>
  43 #include <sys/stropts.h>
  44 #include <sys/strsubr.h>
  45 #include <sys/tihdr.h>
  46 #include <sys/timod.h>
  47 #include <sys/tiuser.h>
  48 #include <sys/debug.h>
  49 #include <sys/signal.h>
  50 #include <sys/pcb.h>
  51 #include <sys/user.h>
  52 #include <sys/errno.h>
  53 #include <sys/cred.h>
  54 #include <sys/policy.h>
  55 #include <sys/inline.h>
  56 #include <sys/cmn_err.h>
  57 #include <sys/kmem.h>
  58 #include <sys/file.h>
  59 #include <sys/sysmacros.h>
  60 #include <sys/systm.h>
  61 #include <sys/t_lock.h>
  62 #include <sys/ddi.h>
  63 #include <sys/vtrace.h>
  64 #include <sys/callb.h>
  65 #include <sys/strsun.h>
  66 
  67 #include <sys/strlog.h>
  68 #include <rpc/rpc_com.h>
  69 #include <inet/common.h>
  70 #include <rpc/types.h>
  71 #include <sys/time.h>
  72 #include <rpc/xdr.h>
  73 #include <rpc/auth.h>
  74 #include <rpc/clnt.h>
  75 #include <rpc/rpc_msg.h>
  76 #include <rpc/clnt.h>
  77 #include <rpc/svc.h>
  78 #include <rpc/rpcsys.h>
  79 #include <rpc/rpc_rdma.h>
  80 
  81 /*
  82  * This is the loadable module wrapper.
  83  */
  84 #include <sys/conf.h>
  85 #include <sys/modctl.h>
  86 #include <sys/syscall.h>
  87 
  88 extern struct streamtab rpcinfo;
  89 
  90 static struct fmodsw fsw = {
  91         "rpcmod",
  92         &rpcinfo,
  93         D_NEW|D_MP,
  94 };
  95 
  96 /*
  97  * Module linkage information for the kernel.
  98  */
  99 
 100 static struct modlstrmod modlstrmod = {
 101         &mod_strmodops, "rpc interface str mod", &fsw
 102 };
 103 
 104 /*
 105  * For the RPC system call.
 106  */
 107 static struct sysent rpcsysent = {
 108         2,
 109         SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD,
 110         rpcsys
 111 };
 112 
 113 static struct modlsys modlsys = {
 114         &mod_syscallops,
 115         "RPC syscall",
 116         &rpcsysent
 117 };
 118 
 119 #ifdef _SYSCALL32_IMPL
 120 static struct modlsys modlsys32 = {
 121         &mod_syscallops32,
 122         "32-bit RPC syscall",
 123         &rpcsysent
 124 };
 125 #endif /* _SYSCALL32_IMPL */
 126 
 127 static struct modlinkage modlinkage = {
 128         MODREV_1,
 129         {
 130                 &modlsys,
 131 #ifdef _SYSCALL32_IMPL
 132                 &modlsys32,
 133 #endif
 134                 &modlstrmod,
 135                 NULL
 136         }
 137 };
 138 
 139 int
 140 _init(void)
 141 {
 142         int error = 0;
 143         callb_id_t cid;
 144         int status;
 145 
 146         svc_init();
 147         clnt_init();
 148         cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc");
 149 
 150         if (error = mod_install(&modlinkage)) {
 151                 /*
 152                  * Could not install module, cleanup previous
 153                  * initialization work.
 154                  */
 155                 clnt_fini();
 156                 if (cid != NULL)
 157                         (void) callb_delete(cid);
 158 
 159                 return (error);
 160         }
 161 
 162         /*
 163          * Load up the RDMA plugins and initialize the stats. Even if the
 164          * plugins loadup fails, but rpcmod was successfully installed the
 165          * counters still get initialized.
 166          */
 167         rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL);
 168         mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL);
 169 
 170         cv_init(&rdma_wait.svc_cv, NULL, CV_DEFAULT, NULL);
 171         mutex_init(&rdma_wait.svc_lock, NULL, MUTEX_DEFAULT, NULL);
 172 
 173         mt_kstat_init();
 174 
 175         /*
 176          * Get our identification into ldi.  This is used for loading
 177          * other modules, e.g. rpcib.
 178          */
 179         status = ldi_ident_from_mod(&modlinkage, &rpcmod_li);
 180         if (status != 0) {
 181                 cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status);
 182                 rpcmod_li = NULL;
 183         }
 184 
 185         return (error);
 186 }
 187 
 188 /*
 189  * The unload entry point fails, because we advertise entry points into
 190  * rpcmod from the rest of kRPC: rpcmod_release().
 191  */
 192 int
 193 _fini(void)
 194 {
 195         return (EBUSY);
 196 }
 197 
 198 int
 199 _info(struct modinfo *modinfop)
 200 {
 201         return (mod_info(&modlinkage, modinfop));
 202 }
 203 
 204 extern int nulldev();
 205 
 206 #define RPCMOD_ID       2049
 207 
 208 int rmm_open(queue_t *, dev_t *, int, int, cred_t *);
 209 int rmm_close(queue_t *, int, cred_t *);
 210 
 211 /*
 212  * To save instructions, since STREAMS ignores the return value
 213  * from these functions, they are defined as void here. Kind of icky, but...
 214  */
 215 void rmm_rput(queue_t *, mblk_t *);
 216 void rmm_wput(queue_t *, mblk_t *);
 217 void rmm_rsrv(queue_t *);
 218 void rmm_wsrv(queue_t *);
 219 
 220 int rpcmodopen(queue_t *, dev_t *, int, int, cred_t *);
 221 int rpcmodclose(queue_t *, int, cred_t *);
 222 void rpcmodrput(queue_t *, mblk_t *);
 223 void rpcmodwput(queue_t *, mblk_t *);
 224 void rpcmodrsrv();
 225 void rpcmodwsrv(queue_t *);
 226 
 227 static  void    rpcmodwput_other(queue_t *, mblk_t *);
 228 static  int     mir_close(queue_t *q);
 229 static  int     mir_open(queue_t *q, dev_t *devp, int flag, int sflag,
 230                     cred_t *credp);
 231 static  void    mir_rput(queue_t *q, mblk_t *mp);
 232 static  void    mir_rsrv(queue_t *q);
 233 static  void    mir_wput(queue_t *q, mblk_t *mp);
 234 static  void    mir_wsrv(queue_t *q);
 235 
 236 static struct module_info rpcmod_info =
 237         {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
 238 
 239 static struct qinit rpcmodrinit = {
 240         (int (*)())rmm_rput,
 241         (int (*)())rmm_rsrv,
 242         rmm_open,
 243         rmm_close,
 244         nulldev,
 245         &rpcmod_info,
 246         NULL
 247 };
 248 
 249 /*
 250  * The write put procedure is simply putnext to conserve stack space.
 251  * The write service procedure is not used to queue data, but instead to
 252  * synchronize with flow control.
 253  */
 254 static struct qinit rpcmodwinit = {
 255         (int (*)())rmm_wput,
 256         (int (*)())rmm_wsrv,
 257         rmm_open,
 258         rmm_close,
 259         nulldev,
 260         &rpcmod_info,
 261         NULL
 262 };
 263 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL };
 264 
 265 struct xprt_style_ops {
 266         int (*xo_open)();
 267         int (*xo_close)();
 268         void (*xo_wput)();
 269         void (*xo_wsrv)();
 270         void (*xo_rput)();
 271         void (*xo_rsrv)();
 272 };
 273 
 274 /*
 275  * Read side has no service procedure.
 276  */
 277 static struct xprt_style_ops xprt_clts_ops = {
 278         rpcmodopen,
 279         rpcmodclose,
 280         rpcmodwput,
 281         rpcmodwsrv,
 282         rpcmodrput,
 283         NULL
 284 };
 285 
 286 static struct xprt_style_ops xprt_cots_ops = {
 287         mir_open,
 288         mir_close,
 289         mir_wput,
 290         mir_wsrv,
 291         mir_rput,
 292         mir_rsrv
 293 };
 294 
 295 /*
 296  * Per rpcmod "slot" data structure. q->q_ptr points to one of these.
 297  */
 298 struct rpcm {
 299         void            *rm_krpc_cell;  /* Reserved for use by kRPC */
 300         struct          xprt_style_ops  *rm_ops;
 301         int             rm_type;        /* Client or server side stream */
 302 #define RM_CLOSING      0x1             /* somebody is trying to close slot */
 303         uint_t          rm_state;       /* state of the slot. see above */
 304         uint_t          rm_ref;         /* cnt of external references to slot */
 305         kmutex_t        rm_lock;        /* mutex protecting above fields */
 306         kcondvar_t      rm_cwait;       /* condition for closing */
 307         zoneid_t        rm_zoneid;      /* zone which pushed rpcmod */
 308 };
 309 
 310 struct temp_slot {
 311         void *cell;
 312         struct xprt_style_ops *ops;
 313         int type;
 314         mblk_t *info_ack;
 315         kmutex_t lock;
 316         kcondvar_t wait;
 317 };
 318 
 319 typedef struct mir_s {
 320         void    *mir_krpc_cell; /* Reserved for kRPC use. This field */
 321                                         /* must be first in the structure. */
 322         struct xprt_style_ops   *rm_ops;
 323         int     mir_type;               /* Client or server side stream */
 324 
 325         mblk_t  *mir_head_mp;           /* RPC msg in progress */
 326                 /*
 327                  * mir_head_mp points the first mblk being collected in
 328                  * the current RPC message.  Record headers are removed
 329                  * before data is linked into mir_head_mp.
 330                  */
 331         mblk_t  *mir_tail_mp;           /* Last mblk in mir_head_mp */
 332                 /*
 333                  * mir_tail_mp points to the last mblk in the message
 334                  * chain starting at mir_head_mp.  It is only valid
 335                  * if mir_head_mp is non-NULL and is used to add new
 336                  * data blocks to the end of chain quickly.
 337                  */
 338 
 339         int32_t mir_frag_len;           /* Bytes seen in the current frag */
 340                 /*
 341                  * mir_frag_len starts at -4 for beginning of each fragment.
 342                  * When this length is negative, it indicates the number of
 343                  * bytes that rpcmod needs to complete the record marker
 344                  * header.  When it is positive or zero, it holds the number
 345                  * of bytes that have arrived for the current fragment and
 346                  * are held in mir_header_mp.
 347                  */
 348 
 349         int32_t mir_frag_header;
 350                 /*
 351                  * Fragment header as collected for the current fragment.
 352                  * It holds the last-fragment indicator and the number
 353                  * of bytes in the fragment.
 354                  */
 355 
 356         unsigned int
 357                 mir_ordrel_pending : 1, /* Sent T_ORDREL_REQ */
 358                 mir_hold_inbound : 1,   /* Hold inbound messages on server */
 359                                         /* side until outbound flow control */
 360                                         /* is relieved. */
 361                 mir_closing : 1,        /* The stream is being closed */
 362                 mir_inrservice : 1,     /* data queued or rd srv proc running */
 363                 mir_inwservice : 1,     /* data queued or wr srv proc running */
 364                 mir_inwflushdata : 1,   /* flush M_DATAs when srv runs */
 365                 /*
 366                  * On client streams, mir_clntreq is 0 or 1; it is set
 367                  * to 1 whenever a new request is sent out (mir_wput)
 368                  * and cleared when the timer fires (mir_timer).  If
 369                  * the timer fires with this value equal to 0, then the
 370                  * stream is considered idle and kRPC is notified.
 371                  */
 372                 mir_clntreq : 1,
 373                 /*
 374                  * On server streams, stop accepting messages
 375                  */
 376                 mir_svc_no_more_msgs : 1,
 377                 mir_listen_stream : 1,  /* listen end point */
 378                 mir_unused : 1, /* no longer used */
 379                 mir_timer_call : 1,
 380                 mir_junk_fill_thru_bit_31 : 21;
 381 
 382         int     mir_setup_complete;     /* server has initialized everything */
 383         timeout_id_t mir_timer_id;      /* Timer for idle checks */
 384         clock_t mir_idle_timeout;       /* Allowed idle time before shutdown */
 385                 /*
 386                  * This value is copied from clnt_idle_timeout or
 387                  * svc_idle_timeout during the appropriate ioctl.
 388                  * Kept in milliseconds
 389                  */
 390         clock_t mir_use_timestamp;      /* updated on client with each use */
 391                 /*
 392                  * This value is set to lbolt
 393                  * every time a client stream sends or receives data.
 394                  * Even if the timer message arrives, we don't shutdown
 395                  * client unless:
 396                  *    lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp.
 397                  * This value is kept in HZ.
 398                  */
 399 
 400         uint_t  *mir_max_msg_sizep;     /* Reference to sanity check size */
 401                 /*
 402                  * This pointer is set to &clnt_max_msg_size or
 403                  * &svc_max_msg_size during the appropriate ioctl.
 404                  */
 405         zoneid_t mir_zoneid;    /* zone which pushed rpcmod */
 406         /* Server-side fields. */
 407         int     mir_ref_cnt;            /* Reference count: server side only */
 408                                         /* counts the number of references */
 409                                         /* that a kernel RPC server thread */
 410                                         /* (see svc_run()) has on this rpcmod */
 411                                         /* slot. Effectively, it is the */
 412                                         /* number of unprocessed messages */
 413                                         /* that have been passed up to the */
 414                                         /* kRPC layer */
 415 
 416         mblk_t  *mir_svc_pend_mp;       /* Pending T_ORDREL_IND or */
 417                                         /* T_DISCON_IND */
 418 
 419         /*
 420          * these fields are for both client and server, but for debugging,
 421          * it is easier to have these last in the structure.
 422          */
 423         kmutex_t        mir_mutex;      /* Mutex and condvar for close */
 424         kcondvar_t      mir_condvar;    /* synchronization. */
 425         kcondvar_t      mir_timer_cv;   /* Timer routine sync. */
 426 } mir_t;
 427 
 428 void tmp_rput(queue_t *q, mblk_t *mp);
 429 
 430 struct xprt_style_ops tmpops = {
 431         NULL,
 432         NULL,
 433         putnext,
 434         NULL,
 435         tmp_rput,
 436         NULL
 437 };
 438 
 439 void
 440 tmp_rput(queue_t *q, mblk_t *mp)
 441 {
 442         struct temp_slot *t = (struct temp_slot *)(q->q_ptr);
 443         struct T_info_ack *pptr;
 444 
 445         switch (mp->b_datap->db_type) {
 446         case M_PCPROTO:
 447                 pptr = (struct T_info_ack *)mp->b_rptr;
 448                 switch (pptr->PRIM_type) {
 449                 case T_INFO_ACK:
 450                         mutex_enter(&t->lock);
 451                         t->info_ack = mp;
 452                         cv_signal(&t->wait);
 453                         mutex_exit(&t->lock);
 454                         return;
 455                 default:
 456                         break;
 457                 }
 458         default:
 459                 break;
 460         }
 461 
 462         /*
 463          * Not an info-ack, so free it. This is ok because we should
 464          * not be receiving data until the open finishes: rpcmod
 465          * is pushed well before the end-point is bound to an address.
 466          */
 467         freemsg(mp);
 468 }
 469 
 470 int
 471 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
 472 {
 473         mblk_t *bp;
 474         struct temp_slot ts, *t;
 475         struct T_info_ack *pptr;
 476         int error = 0;
 477 
 478         ASSERT(q != NULL);
 479         /*
 480          * Check for re-opens.
 481          */
 482         if (q->q_ptr) {
 483                 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END,
 484                     "rpcmodopen_end:(%s)", "q->qptr");
 485                 return (0);
 486         }
 487 
 488         t = &ts;
 489         bzero(t, sizeof (*t));
 490         q->q_ptr = (void *)t;
 491         WR(q)->q_ptr = (void *)t;
 492 
 493         /*
 494          * Allocate the required messages upfront.
 495          */
 496         if ((bp = allocb_cred(sizeof (struct T_info_req) +
 497             sizeof (struct T_info_ack), crp, curproc->p_pid)) == NULL) {
 498                 return (ENOBUFS);
 499         }
 500 
 501         mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL);
 502         cv_init(&t->wait, NULL, CV_DEFAULT, NULL);
 503 
 504         t->ops = &tmpops;
 505 
 506         qprocson(q);
 507         bp->b_datap->db_type = M_PCPROTO;
 508         *(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ;
 509         bp->b_wptr += sizeof (struct T_info_req);
 510         putnext(WR(q), bp);
 511 
 512         mutex_enter(&t->lock);
 513         while (t->info_ack == NULL) {
 514                 if (cv_wait_sig(&t->wait, &t->lock) == 0) {
 515                         error = EINTR;
 516                         break;
 517                 }
 518         }
 519         mutex_exit(&t->lock);
 520 
 521         if (error)
 522                 goto out;
 523 
 524         pptr = (struct T_info_ack *)t->info_ack->b_rptr;
 525 
 526         if (pptr->SERV_type == T_CLTS) {
 527                 if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0)
 528                         ((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops;
 529         } else {
 530                 if ((error = mir_open(q, devp, flag, sflag, crp)) == 0)
 531                         ((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops;
 532         }
 533 
 534 out:
 535         if (error)
 536                 qprocsoff(q);
 537 
 538         freemsg(t->info_ack);
 539         mutex_destroy(&t->lock);
 540         cv_destroy(&t->wait);
 541 
 542         return (error);
 543 }
 544 
 545 void
 546 rmm_rput(queue_t *q, mblk_t  *mp)
 547 {
 548         (*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp);
 549 }
 550 
 551 void
 552 rmm_rsrv(queue_t *q)
 553 {
 554         (*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q);
 555 }
 556 
 557 void
 558 rmm_wput(queue_t *q, mblk_t *mp)
 559 {
 560         (*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp);
 561 }
 562 
 563 void
 564 rmm_wsrv(queue_t *q)
 565 {
 566         (*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q);
 567 }
 568 
 569 int
 570 rmm_close(queue_t *q, int flag, cred_t *crp)
 571 {
 572         return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
 573 }
 574 
 575 /*
 576  * rpcmodopen - open routine gets called when the module gets pushed
 577  *              onto the stream.
 578  */
 579 /*ARGSUSED*/
 580 int
 581 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
 582 {
 583         struct rpcm *rmp;
 584 
 585         TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
 586 
 587         /*
 588          * Only sufficiently privileged users can use this module, and it
 589          * is assumed that they will use this module properly, and NOT send
 590          * bulk data from downstream.
 591          */
 592         if (secpolicy_rpcmod_open(crp) != 0)
 593                 return (EPERM);
 594 
 595         /*
 596          * Allocate slot data structure.
 597          */
 598         rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP);
 599 
 600         mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL);
 601         cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL);
 602         rmp->rm_zoneid = rpc_zoneid();
 603         /*
 604          * slot type will be set by kRPC client and server ioctl's
 605          */
 606         rmp->rm_type = 0;
 607 
 608         q->q_ptr = (void *)rmp;
 609         WR(q)->q_ptr = (void *)rmp;
 610 
 611         TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end");
 612         return (0);
 613 }
 614 
 615 /*
 616  * rpcmodclose - This routine gets called when the module gets popped
 617  * off of the stream.
 618  */
 619 /*ARGSUSED*/
 620 int
 621 rpcmodclose(queue_t *q, int flag, cred_t *crp)
 622 {
 623         struct rpcm *rmp;
 624 
 625         ASSERT(q != NULL);
 626         rmp = (struct rpcm *)q->q_ptr;
 627 
 628         /*
 629          * Mark our state as closing.
 630          */
 631         mutex_enter(&rmp->rm_lock);
 632         rmp->rm_state |= RM_CLOSING;
 633 
 634         /*
 635          * Check and see if there are any messages on the queue.  If so, send
 636          * the messages, regardless whether the downstream module is ready to
 637          * accept data.
 638          */
 639         if (rmp->rm_type == RPC_SERVER) {
 640                 flushq(q, FLUSHDATA);
 641 
 642                 qenable(WR(q));
 643 
 644                 if (rmp->rm_ref) {
 645                         mutex_exit(&rmp->rm_lock);
 646                         /*
 647                          * call into SVC to clean the queue
 648                          */
 649                         svc_queueclean(q);
 650                         mutex_enter(&rmp->rm_lock);
 651 
 652                         /*
 653                          * Block while there are kRPC threads with a reference
 654                          * to this message.
 655                          */
 656                         while (rmp->rm_ref)
 657                                 cv_wait(&rmp->rm_cwait, &rmp->rm_lock);
 658                 }
 659 
 660                 mutex_exit(&rmp->rm_lock);
 661 
 662                 /*
 663                  * It is now safe to remove this queue from the stream. No kRPC
 664                  * threads have a reference to the stream, and none ever will,
 665                  * because RM_CLOSING is set.
 666                  */
 667                 qprocsoff(q);
 668 
 669                 /* Notify kRPC that this stream is going away. */
 670                 svc_queueclose(q);
 671         } else {
 672                 mutex_exit(&rmp->rm_lock);
 673                 qprocsoff(q);
 674         }
 675 
 676         q->q_ptr = NULL;
 677         WR(q)->q_ptr = NULL;
 678         mutex_destroy(&rmp->rm_lock);
 679         cv_destroy(&rmp->rm_cwait);
 680         kmem_free(rmp, sizeof (*rmp));
 681         return (0);
 682 }
 683 
 684 /*
 685  * rpcmodrput - Module read put procedure.  This is called from
 686  *              the module, driver, or stream head downstream.
 687  */
 688 void
 689 rpcmodrput(queue_t *q, mblk_t *mp)
 690 {
 691         struct rpcm *rmp;
 692         union T_primitives *pptr;
 693         int hdrsz;
 694 
 695         TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:");
 696 
 697         ASSERT(q != NULL);
 698         rmp = (struct rpcm *)q->q_ptr;
 699 
 700         if (rmp->rm_type == 0) {
 701                 freemsg(mp);
 702                 return;
 703         }
 704 
 705         switch (mp->b_datap->db_type) {
 706         default:
 707                 putnext(q, mp);
 708                 break;
 709 
 710         case M_PROTO:
 711         case M_PCPROTO:
 712                 ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t));
 713                 pptr = (union T_primitives *)mp->b_rptr;
 714 
 715                 /*
 716                  * Forward this message to kRPC if it is data.
 717                  */
 718                 if (pptr->type == T_UNITDATA_IND) {
 719                         /*
 720                          * Check if the module is being popped.
 721                          */
 722                         mutex_enter(&rmp->rm_lock);
 723                         if (rmp->rm_state & RM_CLOSING) {
 724                                 mutex_exit(&rmp->rm_lock);
 725                                 putnext(q, mp);
 726                                 break;
 727                         }
 728 
 729                         switch (rmp->rm_type) {
 730                         case RPC_CLIENT:
 731                                 mutex_exit(&rmp->rm_lock);
 732                                 hdrsz = mp->b_wptr - mp->b_rptr;
 733 
 734                                 /*
 735                                  * Make sure the header is sane.
 736                                  */
 737                                 if (hdrsz < TUNITDATAINDSZ ||
 738                                     hdrsz < (pptr->unitdata_ind.OPT_length +
 739                                     pptr->unitdata_ind.OPT_offset) ||
 740                                     hdrsz < (pptr->unitdata_ind.SRC_length +
 741                                     pptr->unitdata_ind.SRC_offset)) {
 742                                         freemsg(mp);
 743                                         return;
 744                                 }
 745 
 746                                 /*
 747                                  * Call clnt_clts_dispatch_notify, so that it
 748                                  * can pass the message to the proper caller.
 749                                  * Don't discard the header just yet since the
 750                                  * client may need the sender's address.
 751                                  */
 752                                 clnt_clts_dispatch_notify(mp, hdrsz,
 753                                     rmp->rm_zoneid);
 754                                 return;
 755                         case RPC_SERVER:
 756                                 /*
 757                                  * rm_krpc_cell is exclusively used by the kRPC
 758                                  * CLTS server. Try to submit the message to
 759                                  * kRPC. Since this is an unreliable channel, we
 760                                  * can just free the message in case the kRPC
 761                                  * does not accept new messages.
 762                                  */
 763                                 if (rmp->rm_krpc_cell &&
 764                                     svc_queuereq(q, mp, TRUE)) {
 765                                         /*
 766                                          * Raise the reference count on this
 767                                          * module to prevent it from being
 768                                          * popped before kRPC generates the
 769                                          * reply.
 770                                          */
 771                                         rmp->rm_ref++;
 772                                         mutex_exit(&rmp->rm_lock);
 773                                 } else {
 774                                         mutex_exit(&rmp->rm_lock);
 775                                         freemsg(mp);
 776                                 }
 777                                 return;
 778                         default:
 779                                 mutex_exit(&rmp->rm_lock);
 780                                 freemsg(mp);
 781                                 return;
 782                         } /* end switch(rmp->rm_type) */
 783                 } else if (pptr->type == T_UDERROR_IND) {
 784                         mutex_enter(&rmp->rm_lock);
 785                         hdrsz = mp->b_wptr - mp->b_rptr;
 786 
 787                         /*
 788                          * Make sure the header is sane
 789                          */
 790                         if (hdrsz < TUDERRORINDSZ ||
 791                             hdrsz < (pptr->uderror_ind.OPT_length +
 792                             pptr->uderror_ind.OPT_offset) ||
 793                             hdrsz < (pptr->uderror_ind.DEST_length +
 794                             pptr->uderror_ind.DEST_offset)) {
 795                                 mutex_exit(&rmp->rm_lock);
 796                                 freemsg(mp);
 797                                 return;
 798                         }
 799 
 800                         /*
 801                          * In the case where a unit data error has been
 802                          * received, all we need to do is clear the message from
 803                          * the queue.
 804                          */
 805                         mutex_exit(&rmp->rm_lock);
 806                         freemsg(mp);
 807                         RPCLOG(32, "rpcmodrput: unitdata error received at "
 808                             "%ld\n", gethrestime_sec());
 809                         return;
 810                 } /* end else if (pptr->type == T_UDERROR_IND) */
 811 
 812                 putnext(q, mp);
 813                 break;
 814         } /* end switch (mp->b_datap->db_type) */
 815 
 816         TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
 817             "rpcmodrput_end:");
 818         /*
 819          * Return codes are not looked at by the STREAMS framework.
 820          */
 821 }
 822 
 823 /*
 824  * write put procedure
 825  */
 826 void
 827 rpcmodwput(queue_t *q, mblk_t *mp)
 828 {
 829         struct rpcm     *rmp;
 830 
 831         ASSERT(q != NULL);
 832 
 833         switch (mp->b_datap->db_type) {
 834                 case M_PROTO:
 835                 case M_PCPROTO:
 836                         break;
 837                 default:
 838                         rpcmodwput_other(q, mp);
 839                         return;
 840         }
 841 
 842         /*
 843          * Check to see if we can send the message downstream.
 844          */
 845         if (canputnext(q)) {
 846                 putnext(q, mp);
 847                 return;
 848         }
 849 
 850         rmp = (struct rpcm *)q->q_ptr;
 851         ASSERT(rmp != NULL);
 852 
 853         /*
 854          * The first canputnext failed.  Try again except this time with the
 855          * lock held, so that we can check the state of the stream to see if
 856          * it is closing.  If either of these conditions evaluate to true
 857          * then send the meesage.
 858          */
 859         mutex_enter(&rmp->rm_lock);
 860         if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) {
 861                 mutex_exit(&rmp->rm_lock);
 862                 putnext(q, mp);
 863         } else {
 864                 /*
 865                  * canputnext failed again and the stream is not closing.
 866                  * Place the message on the queue and let the service
 867                  * procedure handle the message.
 868                  */
 869                 mutex_exit(&rmp->rm_lock);
 870                 (void) putq(q, mp);
 871         }
 872 }
 873 
 874 static void
 875 rpcmodwput_other(queue_t *q, mblk_t *mp)
 876 {
 877         struct rpcm     *rmp;
 878         struct iocblk   *iocp;
 879 
 880         rmp = (struct rpcm *)q->q_ptr;
 881         ASSERT(rmp != NULL);
 882 
 883         switch (mp->b_datap->db_type) {
 884                 case M_IOCTL:
 885                         iocp = (struct iocblk *)mp->b_rptr;
 886                         ASSERT(iocp != NULL);
 887                         switch (iocp->ioc_cmd) {
 888                                 case RPC_CLIENT:
 889                                 case RPC_SERVER:
 890                                         mutex_enter(&rmp->rm_lock);
 891                                         rmp->rm_type = iocp->ioc_cmd;
 892                                         mutex_exit(&rmp->rm_lock);
 893                                         mp->b_datap->db_type = M_IOCACK;
 894                                         qreply(q, mp);
 895                                         return;
 896                                 default:
 897                                 /*
 898                                  * pass the ioctl downstream and hope someone
 899                                  * down there knows how to handle it.
 900                                  */
 901                                         putnext(q, mp);
 902                                         return;
 903                         }
 904                 default:
 905                         break;
 906         }
 907         /*
 908          * This is something we definitely do not know how to handle, just
 909          * pass the message downstream
 910          */
 911         putnext(q, mp);
 912 }
 913 
 914 /*
 915  * Module write service procedure. This is called by downstream modules
 916  * for back enabling during flow control.
 917  */
 918 void
 919 rpcmodwsrv(queue_t *q)
 920 {
 921         struct rpcm     *rmp;
 922         mblk_t          *mp = NULL;
 923 
 924         rmp = (struct rpcm *)q->q_ptr;
 925         ASSERT(rmp != NULL);
 926 
 927         /*
 928          * Get messages that may be queued and send them down stream
 929          */
 930         while ((mp = getq(q)) != NULL) {
 931                 /*
 932                  * Optimize the service procedure for the server-side, by
 933                  * avoiding a call to canputnext().
 934                  */
 935                 if (rmp->rm_type == RPC_SERVER || canputnext(q)) {
 936                         putnext(q, mp);
 937                         continue;
 938                 }
 939                 (void) putbq(q, mp);
 940                 return;
 941         }
 942 }
 943 
 944 void
 945 rpcmod_hold(queue_t *q)
 946 {
 947         struct rpcm *rmp = (struct rpcm *)q->q_ptr;
 948 
 949         mutex_enter(&rmp->rm_lock);
 950         rmp->rm_ref++;
 951         mutex_exit(&rmp->rm_lock);
 952 }
 953 
 954 void
 955 rpcmod_release(queue_t *q, mblk_t *bp,
 956     /* LINTED E_FUNC_ARG_UNUSED */
 957     bool_t enable __unused)
 958 {
 959         struct rpcm *rmp;
 960 
 961         /*
 962          * For now, just free the message.
 963          */
 964         if (bp)
 965                 freemsg(bp);
 966         rmp = (struct rpcm *)q->q_ptr;
 967 
 968         mutex_enter(&rmp->rm_lock);
 969         rmp->rm_ref--;
 970 
 971         if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) {
 972                 cv_broadcast(&rmp->rm_cwait);
 973         }
 974 
 975         mutex_exit(&rmp->rm_lock);
 976 }
 977 
 978 /*
 979  * This part of rpcmod is pushed on a connection-oriented transport for use
 980  * by RPC.  It serves to bypass the Stream head, implements
 981  * the record marking protocol, and dispatches incoming RPC messages.
 982  */
 983 
 984 /* Default idle timer values */
 985 #define MIR_CLNT_IDLE_TIMEOUT   (5 * (60 * 1000L))      /* 5 minutes */
 986 #define MIR_SVC_IDLE_TIMEOUT    (6 * (60 * 1000L))      /* 6 minutes */
 987 #define MIR_SVC_ORDREL_TIMEOUT  (10 * (60 * 1000L))     /* 10 minutes */
 988 #define MIR_LASTFRAG    0x80000000      /* Record marker */
 989 
 990 #define MIR_SVC_QUIESCED(mir)   \
 991         (mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0)
 992 
 993 #define MIR_CLEAR_INRSRV(mir_ptr)       {       \
 994         (mir_ptr)->mir_inrservice = 0;       \
 995         if ((mir_ptr)->mir_type == RPC_SERVER &&     \
 996                 (mir_ptr)->mir_closing)      \
 997                 cv_signal(&(mir_ptr)->mir_condvar);      \
 998 }
 999 
1000 /*
1001  * Don't block service procedure (and mir_close) if
1002  * we are in the process of closing.
1003  */
1004 #define MIR_WCANPUTNEXT(mir_ptr, write_q)       \
1005         (canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1))
1006 
1007 static int      mir_clnt_dup_request(queue_t *q, mblk_t *mp);
1008 static void     mir_rput_proto(queue_t *q, mblk_t *mp);
1009 static int      mir_svc_policy_notify(queue_t *q, int event);
1010 static void     mir_svc_start(queue_t *wq);
1011 static void     mir_svc_idle_start(queue_t *, mir_t *);
1012 static void     mir_svc_idle_stop(queue_t *, mir_t *);
1013 static void     mir_svc_start_close(queue_t *, mir_t *);
1014 static void     mir_clnt_idle_do_stop(queue_t *);
1015 static void     mir_clnt_idle_stop(queue_t *, mir_t *);
1016 static void     mir_clnt_idle_start(queue_t *, mir_t *);
1017 static void     mir_wput(queue_t *q, mblk_t *mp);
1018 static void     mir_wput_other(queue_t *q, mblk_t *mp);
1019 static void     mir_wsrv(queue_t *q);
1020 static  void    mir_disconnect(queue_t *, mir_t *ir);
1021 static  int     mir_check_len(queue_t *, mblk_t *);
1022 static  void    mir_timer(void *);
1023 
1024 extern void     (*mir_start)(queue_t *);
1025 extern void     (*clnt_stop_idle)(queue_t *);
1026 
1027 volatile clock_t        clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT;
1028 volatile clock_t        svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT;
1029 
1030 /*
1031  * Timeout for subsequent notifications of idle connection.  This is
1032  * typically used to clean up after a wedged orderly release.
1033  */
1034 clock_t svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */
1035 
1036 extern  uint_t  *clnt_max_msg_sizep;
1037 extern  uint_t  *svc_max_msg_sizep;
1038 uint_t  clnt_max_msg_size = RPC_MAXDATASIZE;
1039 uint_t  svc_max_msg_size = RPC_MAXDATASIZE;
1040 uint_t  mir_krpc_cell_null;
1041 
1042 static void
1043 mir_timer_stop(mir_t *mir)
1044 {
1045         timeout_id_t tid;
1046 
1047         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1048 
1049         /*
1050          * Since the mir_mutex lock needs to be released to call
1051          * untimeout(), we need to make sure that no other thread
1052          * can start/stop the timer (changing mir_timer_id) during
1053          * that time.  The mir_timer_call bit and the mir_timer_cv
1054          * condition variable are used to synchronize this.  Setting
1055          * mir_timer_call also tells mir_timer() (refer to the comments
1056          * in mir_timer()) that it does not need to do anything.
1057          */
1058         while (mir->mir_timer_call)
1059                 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1060         mir->mir_timer_call = B_TRUE;
1061 
1062         if ((tid = mir->mir_timer_id) != 0) {
1063                 mir->mir_timer_id = 0;
1064                 mutex_exit(&mir->mir_mutex);
1065                 (void) untimeout(tid);
1066                 mutex_enter(&mir->mir_mutex);
1067         }
1068         mir->mir_timer_call = B_FALSE;
1069         cv_broadcast(&mir->mir_timer_cv);
1070 }
1071 
1072 static void
1073 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl)
1074 {
1075         timeout_id_t tid;
1076 
1077         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1078 
1079         while (mir->mir_timer_call)
1080                 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1081         mir->mir_timer_call = B_TRUE;
1082 
1083         if ((tid = mir->mir_timer_id) != 0) {
1084                 mutex_exit(&mir->mir_mutex);
1085                 (void) untimeout(tid);
1086                 mutex_enter(&mir->mir_mutex);
1087         }
1088         /* Only start the timer when it is not closing. */
1089         if (!mir->mir_closing) {
1090                 mir->mir_timer_id = timeout(mir_timer, q,
1091                     MSEC_TO_TICK(intrvl));
1092         }
1093         mir->mir_timer_call = B_FALSE;
1094         cv_broadcast(&mir->mir_timer_cv);
1095 }
1096 
1097 static int
1098 mir_clnt_dup_request(queue_t *q, mblk_t *mp)
1099 {
1100         mblk_t  *mp1;
1101         uint32_t  new_xid;
1102         uint32_t  old_xid;
1103 
1104         ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex));
1105         new_xid = BE32_TO_U32(&mp->b_rptr[4]);
1106         /*
1107          * This loop is a bit tacky -- it walks the STREAMS list of
1108          * flow-controlled messages.
1109          */
1110         if ((mp1 = q->q_first) != NULL) {
1111                 do {
1112                         old_xid = BE32_TO_U32(&mp1->b_rptr[4]);
1113                         if (new_xid == old_xid)
1114                                 return (1);
1115                 } while ((mp1 = mp1->b_next) != NULL);
1116         }
1117         return (0);
1118 }
1119 
1120 static int
1121 mir_close(queue_t *q)
1122 {
1123         mir_t   *mir = q->q_ptr;
1124         mblk_t  *mp;
1125         bool_t queue_cleaned = FALSE;
1126 
1127         RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q);
1128         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1129         mutex_enter(&mir->mir_mutex);
1130         if ((mp = mir->mir_head_mp) != NULL) {
1131                 mir->mir_head_mp = NULL;
1132                 mir->mir_tail_mp = NULL;
1133                 freemsg(mp);
1134         }
1135         /*
1136          * Set mir_closing so we get notified when MIR_SVC_QUIESCED()
1137          * is TRUE.  And mir_timer_start() won't start the timer again.
1138          */
1139         mir->mir_closing = B_TRUE;
1140         mir_timer_stop(mir);
1141 
1142         if (mir->mir_type == RPC_SERVER) {
1143                 flushq(q, FLUSHDATA);   /* Ditch anything waiting on read q */
1144 
1145                 /*
1146                  * This will prevent more requests from arriving and
1147                  * will force rpcmod to ignore flow control.
1148                  */
1149                 mir_svc_start_close(WR(q), mir);
1150 
1151                 while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) {
1152 
1153                         if (mir->mir_ref_cnt && !mir->mir_inrservice &&
1154                             (queue_cleaned == FALSE)) {
1155                                 /*
1156                                  * call into SVC to clean the queue
1157                                  */
1158                                 mutex_exit(&mir->mir_mutex);
1159                                 svc_queueclean(q);
1160                                 queue_cleaned = TRUE;
1161                                 mutex_enter(&mir->mir_mutex);
1162                                 continue;
1163                         }
1164 
1165                         /*
1166                          * Bugid 1253810 - Force the write service
1167                          * procedure to send its messages, regardless
1168                          * whether the downstream  module is ready
1169                          * to accept data.
1170                          */
1171                         if (mir->mir_inwservice == 1)
1172                                 qenable(WR(q));
1173 
1174                         cv_wait(&mir->mir_condvar, &mir->mir_mutex);
1175                 }
1176 
1177                 mutex_exit(&mir->mir_mutex);
1178                 qprocsoff(q);
1179 
1180                 /* Notify kRPC that this stream is going away. */
1181                 svc_queueclose(q);
1182         } else {
1183                 mutex_exit(&mir->mir_mutex);
1184                 qprocsoff(q);
1185         }
1186 
1187         mutex_destroy(&mir->mir_mutex);
1188         cv_destroy(&mir->mir_condvar);
1189         cv_destroy(&mir->mir_timer_cv);
1190         kmem_free(mir, sizeof (mir_t));
1191         return (0);
1192 }
1193 
1194 /*
1195  * This is server side only (RPC_SERVER).
1196  *
1197  * Exit idle mode.
1198  */
1199 static void
1200 mir_svc_idle_stop(queue_t *q, mir_t *mir)
1201 {
1202         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1203         ASSERT((q->q_flag & QREADR) == 0);
1204         ASSERT(mir->mir_type == RPC_SERVER);
1205         RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q);
1206 
1207         mir_timer_stop(mir);
1208 }
1209 
1210 /*
1211  * This is server side only (RPC_SERVER).
1212  *
1213  * Start idle processing, which will include setting idle timer if the
1214  * stream is not being closed.
1215  */
1216 static void
1217 mir_svc_idle_start(queue_t *q, mir_t *mir)
1218 {
1219         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1220         ASSERT((q->q_flag & QREADR) == 0);
1221         ASSERT(mir->mir_type == RPC_SERVER);
1222         RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q);
1223 
1224         /*
1225          * Don't re-start idle timer if we are closing queues.
1226          */
1227         if (mir->mir_closing) {
1228                 RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n",
1229                     (void *)q);
1230 
1231                 /*
1232                  * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED()
1233                  * is true.  When it is true, and we are in the process of
1234                  * closing the stream, signal any thread waiting in
1235                  * mir_close().
1236                  */
1237                 if (mir->mir_inwservice == 0)
1238                         cv_signal(&mir->mir_condvar);
1239 
1240         } else {
1241                 RPCLOG(16, "mir_svc_idle_start - reset %s timer\n",
1242                     mir->mir_ordrel_pending ? "ordrel" : "normal");
1243                 /*
1244                  * Normal condition, start the idle timer.  If an orderly
1245                  * release has been sent, set the timeout to wait for the
1246                  * client to close its side of the connection.  Otherwise,
1247                  * use the normal idle timeout.
1248                  */
1249                 mir_timer_start(q, mir, mir->mir_ordrel_pending ?
1250                     svc_ordrel_timeout : mir->mir_idle_timeout);
1251         }
1252 }
1253 
1254 /* ARGSUSED */
1255 static int
1256 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
1257 {
1258         mir_t   *mir;
1259 
1260         RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
1261         /* Set variables used directly by kRPC. */
1262         if (!mir_start)
1263                 mir_start = mir_svc_start;
1264         if (!clnt_stop_idle)
1265                 clnt_stop_idle = mir_clnt_idle_do_stop;
1266         if (!clnt_max_msg_sizep)
1267                 clnt_max_msg_sizep = &clnt_max_msg_size;
1268         if (!svc_max_msg_sizep)
1269                 svc_max_msg_sizep = &svc_max_msg_size;
1270 
1271         /* Allocate a zero'ed out mir structure for this stream. */
1272         mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP);
1273 
1274         /*
1275          * We set hold inbound here so that incoming messages will
1276          * be held on the read-side queue until the stream is completely
1277          * initialized with a RPC_CLIENT or RPC_SERVER ioctl.  During
1278          * the ioctl processing, the flag is cleared and any messages that
1279          * arrived between the open and the ioctl are delivered to kRPC.
1280          *
1281          * Early data should never arrive on a client stream since
1282          * servers only respond to our requests and we do not send any.
1283          * until after the stream is initialized.  Early data is
1284          * very common on a server stream where the client will start
1285          * sending data as soon as the connection is made (and this
1286          * is especially true with TCP where the protocol accepts the
1287          * connection before nfsd or kRPC is notified about it).
1288          */
1289 
1290         mir->mir_hold_inbound = 1;
1291 
1292         /*
1293          * Start the record marker looking for a 4-byte header.  When
1294          * this length is negative, it indicates that rpcmod is looking
1295          * for bytes to consume for the record marker header.  When it
1296          * is positive, it holds the number of bytes that have arrived
1297          * for the current fragment and are being held in mir_header_mp.
1298          */
1299 
1300         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1301 
1302         mir->mir_zoneid = rpc_zoneid();
1303         mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL);
1304         cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL);
1305         cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL);
1306 
1307         q->q_ptr = (char *)mir;
1308         WR(q)->q_ptr = (char *)mir;
1309 
1310         /*
1311          * We noenable the read-side queue because we don't want it
1312          * automatically enabled by putq.  We enable it explicitly
1313          * in mir_wsrv when appropriate. (See additional comments on
1314          * flow control at the beginning of mir_rsrv.)
1315          */
1316         noenable(q);
1317 
1318         qprocson(q);
1319         return (0);
1320 }
1321 
1322 /*
1323  * Read-side put routine for both the client and server side.  Does the
1324  * record marking for incoming RPC messages, and when complete, dispatches
1325  * the message to either the client or server.
1326  */
1327 static void
1328 mir_rput(queue_t *q, mblk_t *mp)
1329 {
1330         int     excess;
1331         int32_t frag_len, frag_header;
1332         mblk_t  *cont_mp, *head_mp, *tail_mp, *mp1;
1333         mir_t   *mir = q->q_ptr;
1334         boolean_t stop_timer = B_FALSE;
1335 
1336         ASSERT(mir != NULL);
1337 
1338         /*
1339          * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
1340          * with the corresponding ioctl, then don't accept
1341          * any inbound data.  This should never happen for streams
1342          * created by nfsd or client-side kRPC because they are careful
1343          * to set the mode of the stream before doing anything else.
1344          */
1345         if (mir->mir_type == 0) {
1346                 freemsg(mp);
1347                 return;
1348         }
1349 
1350         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1351 
1352         switch (mp->b_datap->db_type) {
1353         case M_DATA:
1354                 break;
1355         case M_PROTO:
1356         case M_PCPROTO:
1357                 if (MBLKL(mp) < sizeof (t_scalar_t)) {
1358                         RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n",
1359                             (int)MBLKL(mp));
1360                         freemsg(mp);
1361                         return;
1362                 }
1363                 if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) {
1364                         mir_rput_proto(q, mp);
1365                         return;
1366                 }
1367 
1368                 /* Throw away the T_DATA_IND block and continue with data. */
1369                 mp1 = mp;
1370                 mp = mp->b_cont;
1371                 freeb(mp1);
1372                 break;
1373         case M_SETOPTS:
1374                 /*
1375                  * If a module on the stream is trying set the Stream head's
1376                  * high water mark, then set our hiwater to the requested
1377                  * value.  We are the "stream head" for all inbound
1378                  * data messages since messages are passed directly to kRPC.
1379                  */
1380                 if (MBLKL(mp) >= sizeof (struct stroptions)) {
1381                         struct stroptions       *stropts;
1382 
1383                         stropts = (struct stroptions *)mp->b_rptr;
1384                         if ((stropts->so_flags & SO_HIWAT) &&
1385                             !(stropts->so_flags & SO_BAND)) {
1386                                 (void) strqset(q, QHIWAT, 0, stropts->so_hiwat);
1387                         }
1388                 }
1389                 putnext(q, mp);
1390                 return;
1391         case M_FLUSH:
1392                 RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr);
1393                 RPCLOG(32, "on q 0x%p\n", (void *)q);
1394                 putnext(q, mp);
1395                 return;
1396         default:
1397                 putnext(q, mp);
1398                 return;
1399         }
1400 
1401         mutex_enter(&mir->mir_mutex);
1402 
1403         /*
1404          * If this connection is closing, don't accept any new messages.
1405          */
1406         if (mir->mir_svc_no_more_msgs) {
1407                 ASSERT(mir->mir_type == RPC_SERVER);
1408                 mutex_exit(&mir->mir_mutex);
1409                 freemsg(mp);
1410                 return;
1411         }
1412 
1413         /* Get local copies for quicker access. */
1414         frag_len = mir->mir_frag_len;
1415         frag_header = mir->mir_frag_header;
1416         head_mp = mir->mir_head_mp;
1417         tail_mp = mir->mir_tail_mp;
1418 
1419         /* Loop, processing each message block in the mp chain separately. */
1420         do {
1421                 cont_mp = mp->b_cont;
1422                 mp->b_cont = NULL;
1423 
1424                 /*
1425                  * Drop zero-length mblks to prevent unbounded kernel memory
1426                  * consumption.
1427                  */
1428                 if (MBLKL(mp) == 0) {
1429                         freeb(mp);
1430                         continue;
1431                 }
1432 
1433                 /*
1434                  * If frag_len is negative, we're still in the process of
1435                  * building frag_header -- try to complete it with this mblk.
1436                  */
1437                 while (frag_len < 0 && mp->b_rptr < mp->b_wptr) {
1438                         frag_len++;
1439                         frag_header <<= 8;
1440                         frag_header += *mp->b_rptr++;
1441                 }
1442 
1443                 if (MBLKL(mp) == 0 && frag_len < 0) {
1444                         /*
1445                          * We consumed this mblk while trying to complete the
1446                          * fragment header.  Free it and move on.
1447                          */
1448                         freeb(mp);
1449                         continue;
1450                 }
1451 
1452                 ASSERT(frag_len >= 0);
1453 
1454                 /*
1455                  * Now frag_header has the number of bytes in this fragment
1456                  * and we're just waiting to collect them all.  Chain our
1457                  * latest mblk onto the list and see if we now have enough
1458                  * bytes to complete the fragment.
1459                  */
1460                 if (head_mp == NULL) {
1461                         ASSERT(tail_mp == NULL);
1462                         head_mp = tail_mp = mp;
1463                 } else {
1464                         tail_mp->b_cont = mp;
1465                         tail_mp = mp;
1466                 }
1467 
1468                 frag_len += MBLKL(mp);
1469                 excess = frag_len - (frag_header & ~MIR_LASTFRAG);
1470                 if (excess < 0) {
1471                         /*
1472                          * We still haven't received enough data to complete
1473                          * the fragment, so continue on to the next mblk.
1474                          */
1475                         continue;
1476                 }
1477 
1478                 /*
1479                  * We've got a complete fragment.  If there are excess bytes,
1480                  * then they're part of the next fragment's header (of either
1481                  * this RPC message or the next RPC message).  Split that part
1482                  * into its own mblk so that we can safely freeb() it when
1483                  * building frag_header above.
1484                  */
1485                 if (excess > 0) {
1486                         if ((mp1 = dupb(mp)) == NULL &&
1487                             (mp1 = copyb(mp)) == NULL) {
1488                                 freemsg(head_mp);
1489                                 freemsg(cont_mp);
1490                                 RPCLOG0(1, "mir_rput: dupb/copyb failed\n");
1491                                 mir->mir_frag_header = 0;
1492                                 mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1493                                 mir->mir_head_mp = NULL;
1494                                 mir->mir_tail_mp = NULL;
1495                                 mir_disconnect(q, mir); /* drops mir_mutex */
1496                                 return;
1497                         }
1498 
1499                         /*
1500                          * Relink the message chain so that the next mblk is
1501                          * the next fragment header, followed by the rest of
1502                          * the message chain.
1503                          */
1504                         mp1->b_cont = cont_mp;
1505                         cont_mp = mp1;
1506 
1507                         /*
1508                          * Data in the new mblk begins at the next fragment,
1509                          * and data in the old mblk ends at the next fragment.
1510                          */
1511                         mp1->b_rptr = mp1->b_wptr - excess;
1512                         mp->b_wptr -= excess;
1513                 }
1514 
1515                 /*
1516                  * Reset frag_len and frag_header for the next fragment.
1517                  */
1518                 frag_len = -(int32_t)sizeof (uint32_t);
1519                 if (!(frag_header & MIR_LASTFRAG)) {
1520                         /*
1521                          * The current fragment is complete, but more
1522                          * fragments need to be processed before we can
1523                          * pass along the RPC message headed at head_mp.
1524                          */
1525                         frag_header = 0;
1526                         continue;
1527                 }
1528                 frag_header = 0;
1529 
1530                 /*
1531                  * We've got a complete RPC message; pass it to the
1532                  * appropriate consumer.
1533                  */
1534                 switch (mir->mir_type) {
1535                 case RPC_CLIENT:
1536                         if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) {
1537                                 /*
1538                                  * Mark this stream as active.  This marker
1539                                  * is used in mir_timer().
1540                                  */
1541                                 mir->mir_clntreq = 1;
1542                                 mir->mir_use_timestamp = ddi_get_lbolt();
1543                         } else {
1544                                 freemsg(head_mp);
1545                         }
1546                         break;
1547 
1548                 case RPC_SERVER:
1549                         /*
1550                          * Check for flow control before passing the
1551                          * message to kRPC.
1552                          */
1553                         if (!mir->mir_hold_inbound) {
1554                                 if (mir->mir_krpc_cell) {
1555 
1556                                         if (mir_check_len(q, head_mp))
1557                                                 return;
1558 
1559                                         if (q->q_first == NULL &&
1560                                             svc_queuereq(q, head_mp, TRUE)) {
1561                                                 /*
1562                                                  * If the reference count is 0
1563                                                  * (not including this
1564                                                  * request), then the stream is
1565                                                  * transitioning from idle to
1566                                                  * non-idle.  In this case, we
1567                                                  * cancel the idle timer.
1568                                                  */
1569                                                 if (mir->mir_ref_cnt++ == 0)
1570                                                         stop_timer = B_TRUE;
1571                                         } else {
1572                                                 (void) putq(q, head_mp);
1573                                                 mir->mir_inrservice = B_TRUE;
1574                                         }
1575                                 } else {
1576                                         /*
1577                                          * Count # of times this happens. Should
1578                                          * be never, but experience shows
1579                                          * otherwise.
1580                                          */
1581                                         mir_krpc_cell_null++;
1582                                         freemsg(head_mp);
1583                                 }
1584                         } else {
1585                                 /*
1586                                  * If the outbound side of the stream is
1587                                  * flow controlled, then hold this message
1588                                  * until client catches up. mir_hold_inbound
1589                                  * is set in mir_wput and cleared in mir_wsrv.
1590                                  */
1591                                 (void) putq(q, head_mp);
1592                                 mir->mir_inrservice = B_TRUE;
1593                         }
1594                         break;
1595                 default:
1596                         RPCLOG(1, "mir_rput: unknown mir_type %d\n",
1597                             mir->mir_type);
1598                         freemsg(head_mp);
1599                         break;
1600                 }
1601 
1602                 /*
1603                  * Reset the chain since we're starting on a new RPC message.
1604                  */
1605                 head_mp = tail_mp = NULL;
1606         } while ((mp = cont_mp) != NULL);
1607 
1608         /*
1609          * Sanity check the message length; if it's too large mir_check_len()
1610          * will shutdown the connection, drop mir_mutex, and return non-zero.
1611          */
1612         if (head_mp != NULL && mir->mir_setup_complete &&
1613             mir_check_len(q, head_mp))
1614                 return;
1615 
1616         /* Save our local copies back in the mir structure. */
1617         mir->mir_frag_header = frag_header;
1618         mir->mir_frag_len = frag_len;
1619         mir->mir_head_mp = head_mp;
1620         mir->mir_tail_mp = tail_mp;
1621 
1622         /*
1623          * The timer is stopped after the whole message chain is processed.
1624          * The reason is that stopping the timer releases the mir_mutex
1625          * lock temporarily.  This means that the request can be serviced
1626          * while we are still processing the message chain.  This is not
1627          * good.  So we stop the timer here instead.
1628          *
1629          * Note that if the timer fires before we stop it, it will not
1630          * do any harm as MIR_SVC_QUIESCED() is false and mir_timer()
1631          * will just return.
1632          */
1633         if (stop_timer) {
1634                 RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because "
1635                     "ref cnt going to non zero\n", (void *)WR(q));
1636                 mir_svc_idle_stop(WR(q), mir);
1637         }
1638         mutex_exit(&mir->mir_mutex);
1639 }
1640 
1641 static void
1642 mir_rput_proto(queue_t *q, mblk_t *mp)
1643 {
1644         mir_t   *mir = (mir_t *)q->q_ptr;
1645         uint32_t        type;
1646         uint32_t reason = 0;
1647 
1648         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1649 
1650         type = ((union T_primitives *)mp->b_rptr)->type;
1651         switch (mir->mir_type) {
1652         case RPC_CLIENT:
1653                 switch (type) {
1654                 case T_DISCON_IND:
1655                         reason = ((struct T_discon_ind *)
1656                             (mp->b_rptr))->DISCON_reason;
1657                         /*FALLTHROUGH*/
1658                 case T_ORDREL_IND:
1659                         mutex_enter(&mir->mir_mutex);
1660                         if (mir->mir_head_mp) {
1661                                 freemsg(mir->mir_head_mp);
1662                                 mir->mir_head_mp = (mblk_t *)0;
1663                                 mir->mir_tail_mp = (mblk_t *)0;
1664                         }
1665                         /*
1666                          * We are disconnecting, but not necessarily
1667                          * closing. By not closing, we will fail to
1668                          * pick up a possibly changed global timeout value,
1669                          * unless we store it now.
1670                          */
1671                         mir->mir_idle_timeout = clnt_idle_timeout;
1672                         mir_clnt_idle_stop(WR(q), mir);
1673 
1674                         /*
1675                          * Even though we are unconnected, we still
1676                          * leave the idle timer going on the client. The
1677                          * reason for is that if we've disconnected due
1678                          * to a server-side disconnect, reset, or connection
1679                          * timeout, there is a possibility the client may
1680                          * retry the RPC request. This retry needs to done on
1681                          * the same bound address for the server to interpret
1682                          * it as such. However, we don't want
1683                          * to wait forever for that possibility. If the
1684                          * end-point stays unconnected for mir_idle_timeout
1685                          * units of time, then that is a signal to the
1686                          * connection manager to give up waiting for the
1687                          * application (eg. NFS) to send a retry.
1688                          */
1689                         mir_clnt_idle_start(WR(q), mir);
1690                         mutex_exit(&mir->mir_mutex);
1691                         clnt_dispatch_notifyall(WR(q), type, reason);
1692                         freemsg(mp);
1693                         return;
1694                 case T_ERROR_ACK:
1695                 {
1696                         struct T_error_ack      *terror;
1697 
1698                         terror = (struct T_error_ack *)mp->b_rptr;
1699                         RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p",
1700                             (void *)q);
1701                         RPCLOG(1, " ERROR_prim: %s,",
1702                             rpc_tpiprim2name(terror->ERROR_prim));
1703                         RPCLOG(1, " TLI_error: %s,",
1704                             rpc_tpierr2name(terror->TLI_error));
1705                         RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error);
1706                         if (terror->ERROR_prim == T_DISCON_REQ)  {
1707                                 clnt_dispatch_notifyall(WR(q), type, reason);
1708                                 freemsg(mp);
1709                                 return;
1710                         } else {
1711                                 if (clnt_dispatch_notifyconn(WR(q), mp))
1712                                         return;
1713                         }
1714                         break;
1715                 }
1716                 case T_OK_ACK:
1717                 {
1718                         struct T_ok_ack *tok = (struct T_ok_ack *)mp->b_rptr;
1719 
1720                         if (tok->CORRECT_prim == T_DISCON_REQ) {
1721                                 clnt_dispatch_notifyall(WR(q), type, reason);
1722                                 freemsg(mp);
1723                                 return;
1724                         } else {
1725                                 if (clnt_dispatch_notifyconn(WR(q), mp))
1726                                         return;
1727                         }
1728                         break;
1729                 }
1730                 case T_CONN_CON:
1731                 case T_INFO_ACK:
1732                 case T_OPTMGMT_ACK:
1733                         if (clnt_dispatch_notifyconn(WR(q), mp))
1734                                 return;
1735                         break;
1736                 case T_BIND_ACK:
1737                         break;
1738                 default:
1739                         RPCLOG(1, "mir_rput: unexpected message %d "
1740                             "for kRPC client\n",
1741                             ((union T_primitives *)mp->b_rptr)->type);
1742                         break;
1743                 }
1744                 break;
1745 
1746         case RPC_SERVER:
1747                 switch (type) {
1748                 case T_BIND_ACK:
1749                 {
1750                         struct T_bind_ack       *tbind;
1751 
1752                         /*
1753                          * If this is a listening stream, then shut
1754                          * off the idle timer.
1755                          */
1756                         tbind = (struct T_bind_ack *)mp->b_rptr;
1757                         if (tbind->CONIND_number > 0) {
1758                                 mutex_enter(&mir->mir_mutex);
1759                                 mir_svc_idle_stop(WR(q), mir);
1760 
1761                                 /*
1762                                  * mark this as a listen endpoint
1763                                  * for special handling.
1764                                  */
1765 
1766                                 mir->mir_listen_stream = 1;
1767                                 mutex_exit(&mir->mir_mutex);
1768                         }
1769                         break;
1770                 }
1771                 case T_DISCON_IND:
1772                 case T_ORDREL_IND:
1773                         RPCLOG(16, "mir_rput_proto: got %s indication\n",
1774                             type == T_DISCON_IND ? "disconnect"
1775                             : "orderly release");
1776 
1777                         /*
1778                          * For listen endpoint just pass
1779                          * on the message.
1780                          */
1781 
1782                         if (mir->mir_listen_stream)
1783                                 break;
1784 
1785                         mutex_enter(&mir->mir_mutex);
1786 
1787                         /*
1788                          * If client wants to break off connection, record
1789                          * that fact.
1790                          */
1791                         mir_svc_start_close(WR(q), mir);
1792 
1793                         /*
1794                          * If we are idle, then send the orderly release
1795                          * or disconnect indication to nfsd.
1796                          */
1797                         if (MIR_SVC_QUIESCED(mir)) {
1798                                 mutex_exit(&mir->mir_mutex);
1799                                 break;
1800                         }
1801 
1802                         RPCLOG(16, "mir_rput_proto: not idle, so "
1803                             "disconnect/ord rel indication not passed "
1804                             "upstream on 0x%p\n", (void *)q);
1805 
1806                         /*
1807                          * Hold the indication until we get idle
1808                          * If there already is an indication stored,
1809                          * replace it if the new one is a disconnect. The
1810                          * reasoning is that disconnection takes less time
1811                          * to process, and once a client decides to
1812                          * disconnect, we should do that.
1813                          */
1814                         if (mir->mir_svc_pend_mp) {
1815                                 if (type == T_DISCON_IND) {
1816                                         RPCLOG(16, "mir_rput_proto: replacing"
1817                                             " held disconnect/ord rel"
1818                                             " indication with disconnect on"
1819                                             " 0x%p\n", (void *)q);
1820 
1821                                         freemsg(mir->mir_svc_pend_mp);
1822                                         mir->mir_svc_pend_mp = mp;
1823                                 } else {
1824                                         RPCLOG(16, "mir_rput_proto: already "
1825                                             "held a disconnect/ord rel "
1826                                             "indication. freeing ord rel "
1827                                             "ind on 0x%p\n", (void *)q);
1828                                         freemsg(mp);
1829                                 }
1830                         } else
1831                                 mir->mir_svc_pend_mp = mp;
1832 
1833                         mutex_exit(&mir->mir_mutex);
1834                         return;
1835 
1836                 default:
1837                         /* nfsd handles server-side non-data messages. */
1838                         break;
1839                 }
1840                 break;
1841 
1842         default:
1843                 break;
1844         }
1845 
1846         putnext(q, mp);
1847 }
1848 
1849 /*
1850  * The server-side read queues are used to hold inbound messages while
1851  * outbound flow control is exerted.  When outbound flow control is
1852  * relieved, mir_wsrv qenables the read-side queue.  Read-side queues
1853  * are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
1854  */
1855 static void
1856 mir_rsrv(queue_t *q)
1857 {
1858         mir_t   *mir;
1859         mblk_t  *mp;
1860         boolean_t stop_timer = B_FALSE;
1861 
1862         mir = (mir_t *)q->q_ptr;
1863         mutex_enter(&mir->mir_mutex);
1864 
1865         mp = NULL;
1866         switch (mir->mir_type) {
1867         case RPC_SERVER:
1868                 if (mir->mir_ref_cnt == 0)
1869                         mir->mir_hold_inbound = 0;
1870                 if (mir->mir_hold_inbound)
1871                         break;
1872 
1873                 while (mp = getq(q)) {
1874                         if (mir->mir_krpc_cell &&
1875                             (mir->mir_svc_no_more_msgs == 0)) {
1876 
1877                                 if (mir_check_len(q, mp))
1878                                         return;
1879 
1880                                 if (svc_queuereq(q, mp, TRUE)) {
1881                                         /*
1882                                          * If we were idle, turn off idle timer
1883                                          * since we aren't idle any more.
1884                                          */
1885                                         if (mir->mir_ref_cnt++ == 0)
1886                                                 stop_timer = B_TRUE;
1887                                 } else {
1888                                         (void) putbq(q, mp);
1889                                         break;
1890                                 }
1891                         } else {
1892                                 /*
1893                                  * Count # of times this happens. Should be
1894                                  * never, but experience shows otherwise.
1895                                  */
1896                                 if (mir->mir_krpc_cell == NULL)
1897                                         mir_krpc_cell_null++;
1898                                 freemsg(mp);
1899                         }
1900                 }
1901                 break;
1902         case RPC_CLIENT:
1903                 break;
1904         default:
1905                 RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type);
1906 
1907                 if (q->q_first == NULL)
1908                         MIR_CLEAR_INRSRV(mir);
1909 
1910                 mutex_exit(&mir->mir_mutex);
1911 
1912                 return;
1913         }
1914 
1915         /*
1916          * The timer is stopped after all the messages are processed.
1917          * The reason is that stopping the timer releases the mir_mutex
1918          * lock temporarily.  This means that the request can be serviced
1919          * while we are still processing the message queue.  This is not
1920          * good.  So we stop the timer here instead.
1921          */
1922         if (stop_timer)  {
1923                 RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref "
1924                     "cnt going to non zero\n", (void *)WR(q));
1925                 mir_svc_idle_stop(WR(q), mir);
1926         }
1927 
1928         if (q->q_first == NULL) {
1929                 mblk_t  *cmp = NULL;
1930 
1931                 MIR_CLEAR_INRSRV(mir);
1932 
1933                 if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
1934                         cmp = mir->mir_svc_pend_mp;
1935                         mir->mir_svc_pend_mp = NULL;
1936                 }
1937 
1938                 mutex_exit(&mir->mir_mutex);
1939 
1940                 if (cmp != NULL) {
1941                         RPCLOG(16, "mir_rsrv: line %d: sending a held "
1942                             "disconnect/ord rel indication upstream\n",
1943                             __LINE__);
1944                         putnext(q, cmp);
1945                 }
1946 
1947                 return;
1948         }
1949         mutex_exit(&mir->mir_mutex);
1950 }
1951 
1952 static int mir_svc_policy_fails;
1953 
1954 /*
1955  * Called to send an event code to nfsd/lockd so that it initiates
1956  * connection close.
1957  */
1958 static int
1959 mir_svc_policy_notify(queue_t *q, int event)
1960 {
1961         mblk_t  *mp;
1962 #ifdef DEBUG
1963         mir_t *mir = (mir_t *)q->q_ptr;
1964         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1965 #endif
1966         ASSERT(q->q_flag & QREADR);
1967 
1968         /*
1969          * Create an M_DATA message with the event code and pass it to the
1970          * Stream head (nfsd or whoever created the stream will consume it).
1971          */
1972         mp = allocb(sizeof (int), BPRI_HI);
1973 
1974         if (!mp) {
1975 
1976                 mir_svc_policy_fails++;
1977                 RPCLOG(16, "mir_svc_policy_notify: could not allocate event "
1978                     "%d\n", event);
1979                 return (ENOMEM);
1980         }
1981 
1982         U32_TO_BE32(event, mp->b_rptr);
1983         mp->b_wptr = mp->b_rptr + sizeof (int);
1984         putnext(q, mp);
1985         return (0);
1986 }
1987 
1988 /*
1989  * Server side: start the close phase. We want to get this rpcmod slot in an
1990  * idle state before mir_close() is called.
1991  */
1992 static void
1993 mir_svc_start_close(queue_t *wq, mir_t *mir)
1994 {
1995         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1996         ASSERT((wq->q_flag & QREADR) == 0);
1997         ASSERT(mir->mir_type == RPC_SERVER);
1998 
1999         /*
2000          * Do not accept any more messages.
2001          */
2002         mir->mir_svc_no_more_msgs = 1;
2003 
2004         /*
2005          * Next two statements will make the read service procedure
2006          * free everything stuck in the streams read queue.
2007          * It's not necessary because enabling the write queue will
2008          * have the same effect, but why not speed the process along?
2009          */
2010         mir->mir_hold_inbound = 0;
2011         qenable(RD(wq));
2012 
2013         /*
2014          * Meanwhile force the write service procedure to send the
2015          * responses downstream, regardless of flow control.
2016          */
2017         qenable(wq);
2018 }
2019 
2020 void
2021 mir_svc_hold(queue_t *wq)
2022 {
2023         mir_t *mir = (mir_t *)wq->q_ptr;
2024 
2025         mutex_enter(&mir->mir_mutex);
2026         mir->mir_ref_cnt++;
2027         mutex_exit(&mir->mir_mutex);
2028 }
2029 
2030 /*
2031  * This routine is called directly by kRPC after a request is completed,
2032  * whether a reply was sent or the request was dropped.
2033  */
2034 void
2035 mir_svc_release(queue_t *wq, mblk_t *mp, bool_t enable)
2036 {
2037         mir_t   *mir = (mir_t *)wq->q_ptr;
2038         mblk_t  *cmp = NULL;
2039 
2040         ASSERT((wq->q_flag & QREADR) == 0);
2041         if (mp)
2042                 freemsg(mp);
2043 
2044         if (enable)
2045                 qenable(RD(wq));
2046 
2047         mutex_enter(&mir->mir_mutex);
2048 
2049         /*
2050          * Start idle processing if this is the last reference.
2051          */
2052         if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) {
2053                 cmp = mir->mir_svc_pend_mp;
2054                 mir->mir_svc_pend_mp = NULL;
2055         }
2056 
2057         if (cmp) {
2058                 RPCLOG(16, "mir_svc_release: sending a held "
2059                     "disconnect/ord rel indication upstream on queue 0x%p\n",
2060                     (void *)RD(wq));
2061 
2062                 mutex_exit(&mir->mir_mutex);
2063 
2064                 putnext(RD(wq), cmp);
2065 
2066                 mutex_enter(&mir->mir_mutex);
2067         }
2068 
2069         /*
2070          * Start idle processing if this is the last reference.
2071          */
2072         if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) {
2073 
2074                 RPCLOG(16, "mir_svc_release starting idle timer on 0x%p "
2075                     "because ref cnt is zero\n", (void *) wq);
2076 
2077                 mir_svc_idle_start(wq, mir);
2078         }
2079 
2080         mir->mir_ref_cnt--;
2081         ASSERT(mir->mir_ref_cnt >= 0);
2082 
2083         /*
2084          * Wake up the thread waiting to close.
2085          */
2086 
2087         if ((mir->mir_ref_cnt == 0) && mir->mir_closing)
2088                 cv_signal(&mir->mir_condvar);
2089 
2090         mutex_exit(&mir->mir_mutex);
2091 }
2092 
2093 /*
2094  * This routine is called by server-side kRPC when it is ready to
2095  * handle inbound messages on the stream.
2096  */
2097 static void
2098 mir_svc_start(queue_t *wq)
2099 {
2100         mir_t   *mir = (mir_t *)wq->q_ptr;
2101 
2102         /*
2103          * no longer need to take the mir_mutex because the
2104          * mir_setup_complete field has been moved out of
2105          * the binary field protected by the mir_mutex.
2106          */
2107 
2108         mir->mir_setup_complete = 1;
2109         qenable(RD(wq));
2110 }
2111 
2112 /*
2113  * client side wrapper for stopping timer with normal idle timeout.
2114  */
2115 static void
2116 mir_clnt_idle_stop(queue_t *wq, mir_t *mir)
2117 {
2118         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2119         ASSERT((wq->q_flag & QREADR) == 0);
2120         ASSERT(mir->mir_type == RPC_CLIENT);
2121 
2122         mir_timer_stop(mir);
2123 }
2124 
2125 /*
2126  * client side wrapper for stopping timer with normal idle timeout.
2127  */
2128 static void
2129 mir_clnt_idle_start(queue_t *wq, mir_t *mir)
2130 {
2131         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2132         ASSERT((wq->q_flag & QREADR) == 0);
2133         ASSERT(mir->mir_type == RPC_CLIENT);
2134 
2135         mir_timer_start(wq, mir, mir->mir_idle_timeout);
2136 }
2137 
2138 /*
2139  * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on
2140  * end-points that aren't connected.
2141  */
2142 static void
2143 mir_clnt_idle_do_stop(queue_t *wq)
2144 {
2145         mir_t   *mir = (mir_t *)wq->q_ptr;
2146 
2147         RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq);
2148         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2149         mutex_enter(&mir->mir_mutex);
2150         mir_clnt_idle_stop(wq, mir);
2151         mutex_exit(&mir->mir_mutex);
2152 }
2153 
2154 /*
2155  * Timer handler.  It handles idle timeout and memory shortage problem.
2156  */
2157 static void
2158 mir_timer(void *arg)
2159 {
2160         queue_t *wq = (queue_t *)arg;
2161         mir_t *mir = (mir_t *)wq->q_ptr;
2162         boolean_t notify;
2163         clock_t now;
2164 
2165         mutex_enter(&mir->mir_mutex);
2166 
2167         /*
2168          * mir_timer_call is set only when either mir_timer_[start|stop]
2169          * is progressing.  And mir_timer() can only be run while they
2170          * are progressing if the timer is being stopped.  So just
2171          * return.
2172          */
2173         if (mir->mir_timer_call) {
2174                 mutex_exit(&mir->mir_mutex);
2175                 return;
2176         }
2177         mir->mir_timer_id = 0;
2178 
2179         switch (mir->mir_type) {
2180         case RPC_CLIENT:
2181 
2182                 /*
2183                  * For clients, the timer fires at clnt_idle_timeout
2184                  * intervals.  If the activity marker (mir_clntreq) is
2185                  * zero, then the stream has been idle since the last
2186                  * timer event and we notify kRPC.  If mir_clntreq is
2187                  * non-zero, then the stream is active and we just
2188                  * restart the timer for another interval.  mir_clntreq
2189                  * is set to 1 in mir_wput for every request passed
2190                  * downstream.
2191                  *
2192                  * If this was a memory shortage timer reset the idle
2193                  * timeout regardless; the mir_clntreq will not be a
2194                  * valid indicator.
2195                  *
2196                  * The timer is initially started in mir_wput during
2197                  * RPC_CLIENT ioctl processing.
2198                  *
2199                  * The timer interval can be changed for individual
2200                  * streams with the ND variable "mir_idle_timeout".
2201                  */
2202                 now = ddi_get_lbolt();
2203                 if (mir->mir_clntreq > 0 && mir->mir_use_timestamp +
2204                     MSEC_TO_TICK(mir->mir_idle_timeout) - now >= 0) {
2205                         clock_t tout;
2206 
2207                         tout = mir->mir_idle_timeout -
2208                             TICK_TO_MSEC(now - mir->mir_use_timestamp);
2209                         if (tout < 0)
2210                                 tout = 1000;
2211 #if 0
2212                         printf("mir_timer[%d < %d + %d]: reset client timer "
2213                             "to %d (ms)\n", TICK_TO_MSEC(now),
2214                             TICK_TO_MSEC(mir->mir_use_timestamp),
2215                             mir->mir_idle_timeout, tout);
2216 #endif
2217                         mir->mir_clntreq = 0;
2218                         mir_timer_start(wq, mir, tout);
2219                         mutex_exit(&mir->mir_mutex);
2220                         return;
2221                 }
2222 #if 0
2223 printf("mir_timer[%d]: doing client timeout\n", now / hz);
2224 #endif
2225                 /*
2226                  * We are disconnecting, but not necessarily
2227                  * closing. By not closing, we will fail to
2228                  * pick up a possibly changed global timeout value,
2229                  * unless we store it now.
2230                  */
2231                 mir->mir_idle_timeout = clnt_idle_timeout;
2232                 mir_clnt_idle_start(wq, mir);
2233 
2234                 mutex_exit(&mir->mir_mutex);
2235                 /*
2236                  * We pass T_ORDREL_REQ as an integer value
2237                  * to kRPC as the indication that the stream
2238                  * is idle.  This is not a T_ORDREL_REQ message,
2239                  * it is just a convenient value since we call
2240                  * the same kRPC routine for T_ORDREL_INDs and
2241                  * T_DISCON_INDs.
2242                  */
2243                 clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
2244                 return;
2245 
2246         case RPC_SERVER:
2247 
2248                 /*
2249                  * For servers, the timer is only running when the stream
2250                  * is really idle or memory is short.  The timer is started
2251                  * by mir_wput when mir_type is set to RPC_SERVER and
2252                  * by mir_svc_idle_start whenever the stream goes idle
2253                  * (mir_ref_cnt == 0).  The timer is cancelled in
2254                  * mir_rput whenever a new inbound request is passed to kRPC
2255                  * and the stream was previously idle.
2256                  *
2257                  * The timer interval can be changed for individual
2258                  * streams with the ND variable "mir_idle_timeout".
2259                  *
2260                  * If the stream is not idle do nothing.
2261                  */
2262                 if (!MIR_SVC_QUIESCED(mir)) {
2263                         mutex_exit(&mir->mir_mutex);
2264                         return;
2265                 }
2266 
2267                 notify = !mir->mir_inrservice;
2268                 mutex_exit(&mir->mir_mutex);
2269 
2270                 /*
2271                  * If there is no packet queued up in read queue, the stream
2272                  * is really idle so notify nfsd to close it.
2273                  */
2274                 if (notify) {
2275                         RPCLOG(16, "mir_timer: telling stream head listener "
2276                             "to close stream (0x%p)\n", (void *) RD(wq));
2277                         (void) mir_svc_policy_notify(RD(wq), 1);
2278                 }
2279                 return;
2280         default:
2281                 RPCLOG(1, "mir_timer: unexpected mir_type %d\n",
2282                     mir->mir_type);
2283                 mutex_exit(&mir->mir_mutex);
2284                 return;
2285         }
2286 }
2287 
2288 /*
2289  * Called by the RPC package to send either a call or a return, or a
2290  * transport connection request.  Adds the record marking header.
2291  */
2292 static void
2293 mir_wput(queue_t *q, mblk_t *mp)
2294 {
2295         uint_t  frag_header;
2296         mir_t   *mir = (mir_t *)q->q_ptr;
2297         uchar_t *rptr = mp->b_rptr;
2298 
2299         if (!mir) {
2300                 freemsg(mp);
2301                 return;
2302         }
2303 
2304         if (mp->b_datap->db_type != M_DATA) {
2305                 mir_wput_other(q, mp);
2306                 return;
2307         }
2308 
2309         if (mir->mir_ordrel_pending == 1) {
2310                 freemsg(mp);
2311                 RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n",
2312                     (void *)q);
2313                 return;
2314         }
2315 
2316         frag_header = (uint_t)DLEN(mp);
2317         frag_header |= MIR_LASTFRAG;
2318 
2319         /* Stick in the 4 byte record marking header. */
2320         if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) ||
2321             !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
2322                 /*
2323                  * Since we know that M_DATA messages are created exclusively
2324                  * by kRPC, we expect that kRPC will leave room for our header
2325                  * and 4 byte align which is normal for XDR.
2326                  * If kRPC (or someone else) does not cooperate, then we
2327                  * just throw away the message.
2328                  */
2329                 RPCLOG(1, "mir_wput: kRPC did not leave space for record "
2330                     "fragment header (%d bytes left)\n",
2331                     (int)(rptr - mp->b_datap->db_base));
2332                 freemsg(mp);
2333                 return;
2334         }
2335         rptr -= sizeof (uint32_t);
2336         *(uint32_t *)rptr = htonl(frag_header);
2337         mp->b_rptr = rptr;
2338 
2339         mutex_enter(&mir->mir_mutex);
2340         if (mir->mir_type == RPC_CLIENT) {
2341                 /*
2342                  * For the client, set mir_clntreq to indicate that the
2343                  * connection is active.
2344                  */
2345                 mir->mir_clntreq = 1;
2346                 mir->mir_use_timestamp = ddi_get_lbolt();
2347         }
2348 
2349         /*
2350          * If we haven't already queued some data and the downstream module
2351          * can accept more data, send it on, otherwise we queue the message
2352          * and take other actions depending on mir_type.
2353          */
2354         if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) {
2355                 mutex_exit(&mir->mir_mutex);
2356 
2357                 /*
2358                  * Now we pass the RPC message downstream.
2359                  */
2360                 putnext(q, mp);
2361                 return;
2362         }
2363 
2364         switch (mir->mir_type) {
2365         case RPC_CLIENT:
2366                 /*
2367                  * Check for a previous duplicate request on the
2368                  * queue.  If there is one, then we throw away
2369                  * the current message and let the previous one
2370                  * go through.  If we can't find a duplicate, then
2371                  * send this one.  This tap dance is an effort
2372                  * to reduce traffic and processing requirements
2373                  * under load conditions.
2374                  */
2375                 if (mir_clnt_dup_request(q, mp)) {
2376                         mutex_exit(&mir->mir_mutex);
2377                         freemsg(mp);
2378                         return;
2379                 }
2380                 break;
2381         case RPC_SERVER:
2382                 /*
2383                  * Set mir_hold_inbound so that new inbound RPC
2384                  * messages will be held until the client catches
2385                  * up on the earlier replies.  This flag is cleared
2386                  * in mir_wsrv after flow control is relieved;
2387                  * the read-side queue is also enabled at that time.
2388                  */
2389                 mir->mir_hold_inbound = 1;
2390                 break;
2391         default:
2392                 RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type);
2393                 break;
2394         }
2395         mir->mir_inwservice = 1;
2396         (void) putq(q, mp);
2397         mutex_exit(&mir->mir_mutex);
2398 }
2399 
2400 static void
2401 mir_wput_other(queue_t *q, mblk_t *mp)
2402 {
2403         mir_t   *mir = (mir_t *)q->q_ptr;
2404         struct iocblk   *iocp;
2405         uchar_t *rptr = mp->b_rptr;
2406         bool_t  flush_in_svc = FALSE;
2407 
2408         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2409         switch (mp->b_datap->db_type) {
2410         case M_IOCTL:
2411                 iocp = (struct iocblk *)rptr;
2412                 switch (iocp->ioc_cmd) {
2413                 case RPC_CLIENT:
2414                         mutex_enter(&mir->mir_mutex);
2415                         if (mir->mir_type != 0 &&
2416                             mir->mir_type != iocp->ioc_cmd) {
2417 ioc_eperm:
2418                                 mutex_exit(&mir->mir_mutex);
2419                                 iocp->ioc_error = EPERM;
2420                                 iocp->ioc_count = 0;
2421                                 mp->b_datap->db_type = M_IOCACK;
2422                                 qreply(q, mp);
2423                                 return;
2424                         }
2425 
2426                         mir->mir_type = iocp->ioc_cmd;
2427 
2428                         /*
2429                          * Clear mir_hold_inbound which was set to 1 by
2430                          * mir_open.  This flag is not used on client
2431                          * streams.
2432                          */
2433                         mir->mir_hold_inbound = 0;
2434                         mir->mir_max_msg_sizep = &clnt_max_msg_size;
2435 
2436                         /*
2437                          * Start the idle timer.  See mir_timer() for more
2438                          * information on how client timers work.
2439                          */
2440                         mir->mir_idle_timeout = clnt_idle_timeout;
2441                         mir_clnt_idle_start(q, mir);
2442                         mutex_exit(&mir->mir_mutex);
2443 
2444                         mp->b_datap->db_type = M_IOCACK;
2445                         qreply(q, mp);
2446                         return;
2447                 case RPC_SERVER:
2448                         mutex_enter(&mir->mir_mutex);
2449                         if (mir->mir_type != 0 &&
2450                             mir->mir_type != iocp->ioc_cmd)
2451                                 goto ioc_eperm;
2452 
2453                         /*
2454                          * We don't clear mir_hold_inbound here because
2455                          * mir_hold_inbound is used in the flow control
2456                          * model. If we cleared it here, then we'd commit
2457                          * a small violation to the model where the transport
2458                          * might immediately block downstream flow.
2459                          */
2460 
2461                         mir->mir_type = iocp->ioc_cmd;
2462                         mir->mir_max_msg_sizep = &svc_max_msg_size;
2463 
2464                         /*
2465                          * Start the idle timer.  See mir_timer() for more
2466                          * information on how server timers work.
2467                          *
2468                          * Note that it is important to start the idle timer
2469                          * here so that connections time out even if we
2470                          * never receive any data on them.
2471                          */
2472                         mir->mir_idle_timeout = svc_idle_timeout;
2473                         RPCLOG(16, "mir_wput_other starting idle timer on 0x%p "
2474                             "because we got RPC_SERVER ioctl\n", (void *)q);
2475                         mir_svc_idle_start(q, mir);
2476                         mutex_exit(&mir->mir_mutex);
2477 
2478                         mp->b_datap->db_type = M_IOCACK;
2479                         qreply(q, mp);
2480                         return;
2481                 default:
2482                         break;
2483                 }
2484                 break;
2485 
2486         case M_PROTO:
2487                 if (mir->mir_type == RPC_CLIENT) {
2488                         /*
2489                          * We are likely being called from the context of a
2490                          * service procedure. So we need to enqueue. However
2491                          * enqueing may put our message behind data messages.
2492                          * So flush the data first.
2493                          */
2494                         flush_in_svc = TRUE;
2495                 }
2496                 if ((mp->b_wptr - rptr) < sizeof (uint32_t) ||
2497                     !IS_P2ALIGNED(rptr, sizeof (uint32_t)))
2498                         break;
2499 
2500                 switch (((union T_primitives *)rptr)->type) {
2501                 case T_DATA_REQ:
2502                         /* Don't pass T_DATA_REQ messages downstream. */
2503                         freemsg(mp);
2504                         return;
2505                 case T_ORDREL_REQ:
2506                         RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n",
2507                             (void *)q);
2508                         mutex_enter(&mir->mir_mutex);
2509                         if (mir->mir_type != RPC_SERVER) {
2510                                 /*
2511                                  * We are likely being called from
2512                                  * clnt_dispatch_notifyall(). Sending
2513                                  * a T_ORDREL_REQ will result in
2514                                  * a some kind of _IND message being sent,
2515                                  * will be another call to
2516                                  * clnt_dispatch_notifyall(). To keep the stack
2517                                  * lean, queue this message.
2518                                  */
2519                                 mir->mir_inwservice = 1;
2520                                 (void) putq(q, mp);
2521                                 mutex_exit(&mir->mir_mutex);
2522                                 return;
2523                         }
2524 
2525                         /*
2526                          * Mark the structure such that we don't accept any
2527                          * more requests from client. We could defer this
2528                          * until we actually send the orderly release
2529                          * request downstream, but all that does is delay
2530                          * the closing of this stream.
2531                          */
2532                         RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ "
2533                             " so calling mir_svc_start_close\n", (void *)q);
2534 
2535                         mir_svc_start_close(q, mir);
2536 
2537                         /*
2538                          * If we have sent down a T_ORDREL_REQ, don't send
2539                          * any more.
2540                          */
2541                         if (mir->mir_ordrel_pending) {
2542                                 freemsg(mp);
2543                                 mutex_exit(&mir->mir_mutex);
2544                                 return;
2545                         }
2546 
2547                         /*
2548                          * If the stream is not idle, then we hold the
2549                          * orderly release until it becomes idle.  This
2550                          * ensures that kRPC will be able to reply to
2551                          * all requests that we have passed to it.
2552                          *
2553                          * We also queue the request if there is data already
2554                          * queued, because we cannot allow the T_ORDREL_REQ
2555                          * to go before data. When we had a separate reply
2556                          * count, this was not a problem, because the
2557                          * reply count was reconciled when mir_wsrv()
2558                          * completed.
2559                          */
2560                         if (!MIR_SVC_QUIESCED(mir) ||
2561                             mir->mir_inwservice == 1) {
2562                                 mir->mir_inwservice = 1;
2563                                 (void) putq(q, mp);
2564 
2565                                 RPCLOG(16, "mir_wput_other: queuing "
2566                                     "T_ORDREL_REQ on 0x%p\n", (void *)q);
2567 
2568                                 mutex_exit(&mir->mir_mutex);
2569                                 return;
2570                         }
2571 
2572                         /*
2573                          * Mark the structure so that we know we sent
2574                          * an orderly release request, and reset the idle timer.
2575                          */
2576                         mir->mir_ordrel_pending = 1;
2577 
2578                         RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start"
2579                             " on 0x%p because we got T_ORDREL_REQ\n",
2580                             (void *)q);
2581 
2582                         mir_svc_idle_start(q, mir);
2583                         mutex_exit(&mir->mir_mutex);
2584 
2585                         /*
2586                          * When we break, we will putnext the T_ORDREL_REQ.
2587                          */
2588                         break;
2589 
2590                 case T_CONN_REQ:
2591                         mutex_enter(&mir->mir_mutex);
2592                         if (mir->mir_head_mp != NULL) {
2593                                 freemsg(mir->mir_head_mp);
2594                                 mir->mir_head_mp = NULL;
2595                                 mir->mir_tail_mp = NULL;
2596                         }
2597                         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
2598                         /*
2599                          * Restart timer in case mir_clnt_idle_do_stop() was
2600                          * called.
2601                          */
2602                         mir->mir_idle_timeout = clnt_idle_timeout;
2603                         mir_clnt_idle_stop(q, mir);
2604                         mir_clnt_idle_start(q, mir);
2605                         mutex_exit(&mir->mir_mutex);
2606                         break;
2607 
2608                 default:
2609                         /*
2610                          * T_DISCON_REQ is one of the interesting default
2611                          * cases here. Ideally, an M_FLUSH is done before
2612                          * T_DISCON_REQ is done. However, that is somewhat
2613                          * cumbersome for clnt_cots.c to do. So we queue
2614                          * T_DISCON_REQ, and let the service procedure
2615                          * flush all M_DATA.
2616                          */
2617                         break;
2618                 }
2619                 /* fallthru */;
2620         default:
2621                 if (mp->b_datap->db_type >= QPCTL) {
2622                         if (mp->b_datap->db_type == M_FLUSH) {
2623                                 if (mir->mir_type == RPC_CLIENT &&
2624                                     *mp->b_rptr & FLUSHW) {
2625                                         RPCLOG(32, "mir_wput_other: flushing "
2626                                             "wq 0x%p\n", (void *)q);
2627                                         if (*mp->b_rptr & FLUSHBAND) {
2628                                                 flushband(q, *(mp->b_rptr + 1),
2629                                                     FLUSHDATA);
2630                                         } else {
2631                                                 flushq(q, FLUSHDATA);
2632                                         }
2633                                 } else {
2634                                         RPCLOG(32, "mir_wput_other: ignoring "
2635                                             "M_FLUSH on wq 0x%p\n", (void *)q);
2636                                 }
2637                         }
2638                         break;
2639                 }
2640 
2641                 mutex_enter(&mir->mir_mutex);
2642                 if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) {
2643                         mutex_exit(&mir->mir_mutex);
2644                         break;
2645                 }
2646                 mir->mir_inwservice = 1;
2647                 mir->mir_inwflushdata = flush_in_svc;
2648                 (void) putq(q, mp);
2649                 mutex_exit(&mir->mir_mutex);
2650                 qenable(q);
2651 
2652                 return;
2653         }
2654         putnext(q, mp);
2655 }
2656 
2657 static void
2658 mir_wsrv(queue_t *q)
2659 {
2660         mblk_t  *mp;
2661         mir_t   *mir;
2662         bool_t flushdata;
2663 
2664         mir = (mir_t *)q->q_ptr;
2665         mutex_enter(&mir->mir_mutex);
2666 
2667         flushdata = mir->mir_inwflushdata;
2668         mir->mir_inwflushdata = 0;
2669 
2670         while (mp = getq(q)) {
2671                 if (mp->b_datap->db_type == M_DATA) {
2672                         /*
2673                          * Do not send any more data if we have sent
2674                          * a T_ORDREL_REQ.
2675                          */
2676                         if (flushdata || mir->mir_ordrel_pending == 1) {
2677                                 freemsg(mp);
2678                                 continue;
2679                         }
2680 
2681                         /*
2682                          * Make sure that the stream can really handle more
2683                          * data.
2684                          */
2685                         if (!MIR_WCANPUTNEXT(mir, q)) {
2686                                 (void) putbq(q, mp);
2687                                 mutex_exit(&mir->mir_mutex);
2688                                 return;
2689                         }
2690 
2691                         /*
2692                          * Now we pass the RPC message downstream.
2693                          */
2694                         mutex_exit(&mir->mir_mutex);
2695                         putnext(q, mp);
2696                         mutex_enter(&mir->mir_mutex);
2697                         continue;
2698                 }
2699 
2700                 /*
2701                  * This is not an RPC message, pass it downstream
2702                  * (ignoring flow control) if the server side is not sending a
2703                  * T_ORDREL_REQ downstream.
2704                  */
2705                 if (mir->mir_type != RPC_SERVER ||
2706                     ((union T_primitives *)mp->b_rptr)->type !=
2707                     T_ORDREL_REQ) {
2708                         mutex_exit(&mir->mir_mutex);
2709                         putnext(q, mp);
2710                         mutex_enter(&mir->mir_mutex);
2711                         continue;
2712                 }
2713 
2714                 if (mir->mir_ordrel_pending == 1) {
2715                         /*
2716                          * Don't send two T_ORDRELs
2717                          */
2718                         freemsg(mp);
2719                         continue;
2720                 }
2721 
2722                 /*
2723                  * Mark the structure so that we know we sent an orderly
2724                  * release request.  We will check to see slot is idle at the
2725                  * end of this routine, and if so, reset the idle timer to
2726                  * handle orderly release timeouts.
2727                  */
2728                 mir->mir_ordrel_pending = 1;
2729                 RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n",
2730                     (void *)q);
2731                 /*
2732                  * Send the orderly release downstream. If there are other
2733                  * pending replies we won't be able to send them.  However,
2734                  * the only reason we should send the orderly release is if
2735                  * we were idle, or if an unusual event occurred.
2736                  */
2737                 mutex_exit(&mir->mir_mutex);
2738                 putnext(q, mp);
2739                 mutex_enter(&mir->mir_mutex);
2740         }
2741 
2742         if (q->q_first == NULL)
2743                 /*
2744                  * If we call mir_svc_idle_start() below, then
2745                  * clearing mir_inwservice here will also result in
2746                  * any thread waiting in mir_close() to be signaled.
2747                  */
2748                 mir->mir_inwservice = 0;
2749 
2750         if (mir->mir_type != RPC_SERVER) {
2751                 mutex_exit(&mir->mir_mutex);
2752                 return;
2753         }
2754 
2755         /*
2756          * If idle we call mir_svc_idle_start to start the timer (or wakeup
2757          * a close). Also make sure not to start the idle timer on the
2758          * listener stream. This can cause nfsd to send an orderly release
2759          * command on the listener stream.
2760          */
2761         if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) {
2762                 RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p "
2763                     "because mir slot is idle\n", (void *)q);
2764                 mir_svc_idle_start(q, mir);
2765         }
2766 
2767         /*
2768          * If outbound flow control has been relieved, then allow new
2769          * inbound requests to be processed.
2770          */
2771         if (mir->mir_hold_inbound) {
2772                 mir->mir_hold_inbound = 0;
2773                 qenable(RD(q));
2774         }
2775         mutex_exit(&mir->mir_mutex);
2776 }
2777 
2778 static void
2779 mir_disconnect(queue_t *q, mir_t *mir)
2780 {
2781         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2782 
2783         switch (mir->mir_type) {
2784         case RPC_CLIENT:
2785                 /*
2786                  * We are disconnecting, but not necessarily
2787                  * closing. By not closing, we will fail to
2788                  * pick up a possibly changed global timeout value,
2789                  * unless we store it now.
2790                  */
2791                 mir->mir_idle_timeout = clnt_idle_timeout;
2792                 mir_clnt_idle_start(WR(q), mir);
2793                 mutex_exit(&mir->mir_mutex);
2794 
2795                 /*
2796                  * T_DISCON_REQ is passed to kRPC as an integer value
2797                  * (this is not a TPI message).  It is used as a
2798                  * convenient value to indicate a sanity check
2799                  * failure -- the same kRPC routine is also called
2800                  * for T_DISCON_INDs and T_ORDREL_INDs.
2801                  */
2802                 clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
2803                 break;
2804 
2805         case RPC_SERVER:
2806                 mir->mir_svc_no_more_msgs = 1;
2807                 mir_svc_idle_stop(WR(q), mir);
2808                 mutex_exit(&mir->mir_mutex);
2809                 RPCLOG(16, "mir_disconnect: telling "
2810                     "stream head listener to disconnect stream "
2811                     "(0x%p)\n", (void *) q);
2812                 (void) mir_svc_policy_notify(q, 2);
2813                 break;
2814 
2815         default:
2816                 mutex_exit(&mir->mir_mutex);
2817                 break;
2818         }
2819 }
2820 
2821 /*
2822  * Sanity check the message length, and if it's too large, shutdown the
2823  * connection.  Returns 1 if the connection is shutdown; 0 otherwise.
2824  */
2825 static int
2826 mir_check_len(queue_t *q, mblk_t *head_mp)
2827 {
2828         mir_t *mir = q->q_ptr;
2829         uint_t maxsize = 0;
2830         size_t msg_len = msgdsize(head_mp);
2831 
2832         if (mir->mir_max_msg_sizep != NULL)
2833                 maxsize = *mir->mir_max_msg_sizep;
2834 
2835         if (maxsize == 0 || msg_len <= maxsize)
2836                 return (0);
2837 
2838         freemsg(head_mp);
2839         mir->mir_head_mp = NULL;
2840         mir->mir_tail_mp = NULL;
2841         mir->mir_frag_header = 0;
2842         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
2843         if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
2844                 cmn_err(CE_NOTE,
2845                     "kRPC: record fragment from %s of size(%lu) exceeds "
2846                     "maximum (%u). Disconnecting",
2847                     (mir->mir_type == RPC_CLIENT) ? "server" :
2848                     (mir->mir_type == RPC_SERVER) ? "client" :
2849                     "test tool", msg_len, maxsize);
2850         }
2851 
2852         mir_disconnect(q, mir);
2853         return (1);
2854 }