1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 /*
  26  * Copyright 2012 Milan Jurik. All rights reserved.
  27  * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  28  */
  29 /* Copyright (c) 1990 Mentat Inc. */
  30 
  31 /*      Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T     */
  32 /*        All Rights Reserved   */
  33 
  34 /*
  35  * Kernel RPC filtering module
  36  */
  37 
  38 #include <sys/param.h>
  39 #include <sys/types.h>
  40 #include <sys/stream.h>
  41 #include <sys/stropts.h>
  42 #include <sys/strsubr.h>
  43 #include <sys/tihdr.h>
  44 #include <sys/timod.h>
  45 #include <sys/tiuser.h>
  46 #include <sys/debug.h>
  47 #include <sys/signal.h>
  48 #include <sys/pcb.h>
  49 #include <sys/user.h>
  50 #include <sys/errno.h>
  51 #include <sys/cred.h>
  52 #include <sys/policy.h>
  53 #include <sys/inline.h>
  54 #include <sys/cmn_err.h>
  55 #include <sys/kmem.h>
  56 #include <sys/file.h>
  57 #include <sys/sysmacros.h>
  58 #include <sys/systm.h>
  59 #include <sys/t_lock.h>
  60 #include <sys/ddi.h>
  61 #include <sys/vtrace.h>
  62 #include <sys/callb.h>
  63 #include <sys/strsun.h>
  64 
  65 #include <sys/strlog.h>
  66 #include <rpc/rpc_com.h>
  67 #include <inet/common.h>
  68 #include <rpc/types.h>
  69 #include <sys/time.h>
  70 #include <rpc/xdr.h>
  71 #include <rpc/auth.h>
  72 #include <rpc/clnt.h>
  73 #include <rpc/rpc_msg.h>
  74 #include <rpc/clnt.h>
  75 #include <rpc/svc.h>
  76 #include <rpc/rpcsys.h>
  77 #include <rpc/rpc_rdma.h>
  78 
  79 /*
  80  * This is the loadable module wrapper.
  81  */
  82 #include <sys/conf.h>
  83 #include <sys/modctl.h>
  84 #include <sys/syscall.h>
  85 
  86 extern struct streamtab rpcinfo;
  87 
  88 static struct fmodsw fsw = {
  89         "rpcmod",
  90         &rpcinfo,
  91         D_NEW|D_MP,
  92 };
  93 
  94 /*
  95  * Module linkage information for the kernel.
  96  */
  97 
  98 static struct modlstrmod modlstrmod = {
  99         &mod_strmodops, "rpc interface str mod", &fsw
 100 };
 101 
 102 /*
 103  * For the RPC system call.
 104  */
 105 static struct sysent rpcsysent = {
 106         2,
 107         SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD,
 108         rpcsys
 109 };
 110 
 111 static struct modlsys modlsys = {
 112         &mod_syscallops,
 113         "RPC syscall",
 114         &rpcsysent
 115 };
 116 
 117 #ifdef _SYSCALL32_IMPL
 118 static struct modlsys modlsys32 = {
 119         &mod_syscallops32,
 120         "32-bit RPC syscall",
 121         &rpcsysent
 122 };
 123 #endif /* _SYSCALL32_IMPL */
 124 
 125 static struct modlinkage modlinkage = {
 126         MODREV_1,
 127         {
 128                 &modlsys,
 129 #ifdef _SYSCALL32_IMPL
 130                 &modlsys32,
 131 #endif
 132                 &modlstrmod,
 133                 NULL
 134         }
 135 };
 136 
 137 int
 138 _init(void)
 139 {
 140         int error = 0;
 141         callb_id_t cid;
 142         int status;
 143 
 144         svc_init();
 145         clnt_init();
 146         cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc");
 147 
 148         if (error = mod_install(&modlinkage)) {
 149                 /*
 150                  * Could not install module, cleanup previous
 151                  * initialization work.
 152                  */
 153                 clnt_fini();
 154                 if (cid != NULL)
 155                         (void) callb_delete(cid);
 156 
 157                 return (error);
 158         }
 159 
 160         /*
 161          * Load up the RDMA plugins and initialize the stats. Even if the
 162          * plugins loadup fails, but rpcmod was successfully installed the
 163          * counters still get initialized.
 164          */
 165         rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL);
 166         mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL);
 167 
 168         cv_init(&rdma_wait.svc_cv, NULL, CV_DEFAULT, NULL);
 169         mutex_init(&rdma_wait.svc_lock, NULL, MUTEX_DEFAULT, NULL);
 170 
 171         mt_kstat_init();
 172 
 173         /*
 174          * Get our identification into ldi.  This is used for loading
 175          * other modules, e.g. rpcib.
 176          */
 177         status = ldi_ident_from_mod(&modlinkage, &rpcmod_li);
 178         if (status != 0) {
 179                 cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status);
 180                 rpcmod_li = NULL;
 181         }
 182 
 183         return (error);
 184 }
 185 
 186 /*
 187  * The unload entry point fails, because we advertise entry points into
 188  * rpcmod from the rest of kRPC: rpcmod_release().
 189  */
 190 int
 191 _fini(void)
 192 {
 193         return (EBUSY);
 194 }
 195 
 196 int
 197 _info(struct modinfo *modinfop)
 198 {
 199         return (mod_info(&modlinkage, modinfop));
 200 }
 201 
 202 extern int nulldev();
 203 
 204 #define RPCMOD_ID       2049
 205 
 206 int rmm_open(queue_t *, dev_t *, int, int, cred_t *);
 207 int rmm_close(queue_t *, int, cred_t *);
 208 
 209 /*
 210  * To save instructions, since STREAMS ignores the return value
 211  * from these functions, they are defined as void here. Kind of icky, but...
 212  */
 213 void rmm_rput(queue_t *, mblk_t *);
 214 void rmm_wput(queue_t *, mblk_t *);
 215 void rmm_rsrv(queue_t *);
 216 void rmm_wsrv(queue_t *);
 217 
 218 int rpcmodopen(queue_t *, dev_t *, int, int, cred_t *);
 219 int rpcmodclose(queue_t *, int, cred_t *);
 220 void rpcmodrput(queue_t *, mblk_t *);
 221 void rpcmodwput(queue_t *, mblk_t *);
 222 void rpcmodrsrv();
 223 void rpcmodwsrv(queue_t *);
 224 
 225 static  void    rpcmodwput_other(queue_t *, mblk_t *);
 226 static  int     mir_close(queue_t *q);
 227 static  int     mir_open(queue_t *q, dev_t *devp, int flag, int sflag,
 228                     cred_t *credp);
 229 static  void    mir_rput(queue_t *q, mblk_t *mp);
 230 static  void    mir_rsrv(queue_t *q);
 231 static  void    mir_wput(queue_t *q, mblk_t *mp);
 232 static  void    mir_wsrv(queue_t *q);
 233 
 234 static struct module_info rpcmod_info =
 235         {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
 236 
 237 static struct qinit rpcmodrinit = {
 238         (int (*)())rmm_rput,
 239         (int (*)())rmm_rsrv,
 240         rmm_open,
 241         rmm_close,
 242         nulldev,
 243         &rpcmod_info,
 244         NULL
 245 };
 246 
 247 /*
 248  * The write put procedure is simply putnext to conserve stack space.
 249  * The write service procedure is not used to queue data, but instead to
 250  * synchronize with flow control.
 251  */
 252 static struct qinit rpcmodwinit = {
 253         (int (*)())rmm_wput,
 254         (int (*)())rmm_wsrv,
 255         rmm_open,
 256         rmm_close,
 257         nulldev,
 258         &rpcmod_info,
 259         NULL
 260 };
 261 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL };
 262 
 263 struct xprt_style_ops {
 264         int (*xo_open)();
 265         int (*xo_close)();
 266         void (*xo_wput)();
 267         void (*xo_wsrv)();
 268         void (*xo_rput)();
 269         void (*xo_rsrv)();
 270 };
 271 
 272 /*
 273  * Read side has no service procedure.
 274  */
 275 static struct xprt_style_ops xprt_clts_ops = {
 276         rpcmodopen,
 277         rpcmodclose,
 278         rpcmodwput,
 279         rpcmodwsrv,
 280         rpcmodrput,
 281         NULL
 282 };
 283 
 284 static struct xprt_style_ops xprt_cots_ops = {
 285         mir_open,
 286         mir_close,
 287         mir_wput,
 288         mir_wsrv,
 289         mir_rput,
 290         mir_rsrv
 291 };
 292 
 293 /*
 294  * Per rpcmod "slot" data structure. q->q_ptr points to one of these.
 295  */
 296 struct rpcm {
 297         void            *rm_krpc_cell;  /* Reserved for use by kRPC */
 298         struct          xprt_style_ops  *rm_ops;
 299         int             rm_type;        /* Client or server side stream */
 300 #define RM_CLOSING      0x1             /* somebody is trying to close slot */
 301         uint_t          rm_state;       /* state of the slot. see above */
 302         uint_t          rm_ref;         /* cnt of external references to slot */
 303         kmutex_t        rm_lock;        /* mutex protecting above fields */
 304         kcondvar_t      rm_cwait;       /* condition for closing */
 305         zoneid_t        rm_zoneid;      /* zone which pushed rpcmod */
 306 };
 307 
 308 struct temp_slot {
 309         void *cell;
 310         struct xprt_style_ops *ops;
 311         int type;
 312         mblk_t *info_ack;
 313         kmutex_t lock;
 314         kcondvar_t wait;
 315 };
 316 
 317 typedef struct mir_s {
 318         void    *mir_krpc_cell; /* Reserved for kRPC use. This field */
 319                                         /* must be first in the structure. */
 320         struct xprt_style_ops   *rm_ops;
 321         int     mir_type;               /* Client or server side stream */
 322 
 323         mblk_t  *mir_head_mp;           /* RPC msg in progress */
 324                 /*
 325                  * mir_head_mp points the first mblk being collected in
 326                  * the current RPC message.  Record headers are removed
 327                  * before data is linked into mir_head_mp.
 328                  */
 329         mblk_t  *mir_tail_mp;           /* Last mblk in mir_head_mp */
 330                 /*
 331                  * mir_tail_mp points to the last mblk in the message
 332                  * chain starting at mir_head_mp.  It is only valid
 333                  * if mir_head_mp is non-NULL and is used to add new
 334                  * data blocks to the end of chain quickly.
 335                  */
 336 
 337         int32_t mir_frag_len;           /* Bytes seen in the current frag */
 338                 /*
 339                  * mir_frag_len starts at -4 for beginning of each fragment.
 340                  * When this length is negative, it indicates the number of
 341                  * bytes that rpcmod needs to complete the record marker
 342                  * header.  When it is positive or zero, it holds the number
 343                  * of bytes that have arrived for the current fragment and
 344                  * are held in mir_header_mp.
 345                  */
 346 
 347         int32_t mir_frag_header;
 348                 /*
 349                  * Fragment header as collected for the current fragment.
 350                  * It holds the last-fragment indicator and the number
 351                  * of bytes in the fragment.
 352                  */
 353 
 354         unsigned int
 355                 mir_ordrel_pending : 1, /* Sent T_ORDREL_REQ */
 356                 mir_hold_inbound : 1,   /* Hold inbound messages on server */
 357                                         /* side until outbound flow control */
 358                                         /* is relieved. */
 359                 mir_closing : 1,        /* The stream is being closed */
 360                 mir_inrservice : 1,     /* data queued or rd srv proc running */
 361                 mir_inwservice : 1,     /* data queued or wr srv proc running */
 362                 mir_inwflushdata : 1,   /* flush M_DATAs when srv runs */
 363                 /*
 364                  * On client streams, mir_clntreq is 0 or 1; it is set
 365                  * to 1 whenever a new request is sent out (mir_wput)
 366                  * and cleared when the timer fires (mir_timer).  If
 367                  * the timer fires with this value equal to 0, then the
 368                  * stream is considered idle and kRPC is notified.
 369                  */
 370                 mir_clntreq : 1,
 371                 /*
 372                  * On server streams, stop accepting messages
 373                  */
 374                 mir_svc_no_more_msgs : 1,
 375                 mir_listen_stream : 1,  /* listen end point */
 376                 mir_unused : 1, /* no longer used */
 377                 mir_timer_call : 1,
 378                 mir_junk_fill_thru_bit_31 : 21;
 379 
 380         int     mir_setup_complete;     /* server has initialized everything */
 381         timeout_id_t mir_timer_id;      /* Timer for idle checks */
 382         clock_t mir_idle_timeout;       /* Allowed idle time before shutdown */
 383                 /*
 384                  * This value is copied from clnt_idle_timeout or
 385                  * svc_idle_timeout during the appropriate ioctl.
 386                  * Kept in milliseconds
 387                  */
 388         clock_t mir_use_timestamp;      /* updated on client with each use */
 389                 /*
 390                  * This value is set to lbolt
 391                  * every time a client stream sends or receives data.
 392                  * Even if the timer message arrives, we don't shutdown
 393                  * client unless:
 394                  *    lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp.
 395                  * This value is kept in HZ.
 396                  */
 397 
 398         uint_t  *mir_max_msg_sizep;     /* Reference to sanity check size */
 399                 /*
 400                  * This pointer is set to &clnt_max_msg_size or
 401                  * &svc_max_msg_size during the appropriate ioctl.
 402                  */
 403         zoneid_t mir_zoneid;    /* zone which pushed rpcmod */
 404         /* Server-side fields. */
 405         int     mir_ref_cnt;            /* Reference count: server side only */
 406                                         /* counts the number of references */
 407                                         /* that a kernel RPC server thread */
 408                                         /* (see svc_run()) has on this rpcmod */
 409                                         /* slot. Effectively, it is the */
 410                                         /* number of unprocessed messages */
 411                                         /* that have been passed up to the */
 412                                         /* kRPC layer */
 413 
 414         mblk_t  *mir_svc_pend_mp;       /* Pending T_ORDREL_IND or */
 415                                         /* T_DISCON_IND */
 416 
 417         /*
 418          * these fields are for both client and server, but for debugging,
 419          * it is easier to have these last in the structure.
 420          */
 421         kmutex_t        mir_mutex;      /* Mutex and condvar for close */
 422         kcondvar_t      mir_condvar;    /* synchronization. */
 423         kcondvar_t      mir_timer_cv;   /* Timer routine sync. */
 424 } mir_t;
 425 
 426 void tmp_rput(queue_t *q, mblk_t *mp);
 427 
 428 struct xprt_style_ops tmpops = {
 429         NULL,
 430         NULL,
 431         putnext,
 432         NULL,
 433         tmp_rput,
 434         NULL
 435 };
 436 
 437 void
 438 tmp_rput(queue_t *q, mblk_t *mp)
 439 {
 440         struct temp_slot *t = (struct temp_slot *)(q->q_ptr);
 441         struct T_info_ack *pptr;
 442 
 443         switch (mp->b_datap->db_type) {
 444         case M_PCPROTO:
 445                 pptr = (struct T_info_ack *)mp->b_rptr;
 446                 switch (pptr->PRIM_type) {
 447                 case T_INFO_ACK:
 448                         mutex_enter(&t->lock);
 449                         t->info_ack = mp;
 450                         cv_signal(&t->wait);
 451                         mutex_exit(&t->lock);
 452                         return;
 453                 default:
 454                         break;
 455                 }
 456         default:
 457                 break;
 458         }
 459 
 460         /*
 461          * Not an info-ack, so free it. This is ok because we should
 462          * not be receiving data until the open finishes: rpcmod
 463          * is pushed well before the end-point is bound to an address.
 464          */
 465         freemsg(mp);
 466 }
 467 
 468 int
 469 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
 470 {
 471         mblk_t *bp;
 472         struct temp_slot ts, *t;
 473         struct T_info_ack *pptr;
 474         int error = 0;
 475 
 476         ASSERT(q != NULL);
 477         /*
 478          * Check for re-opens.
 479          */
 480         if (q->q_ptr) {
 481                 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END,
 482                     "rpcmodopen_end:(%s)", "q->qptr");
 483                 return (0);
 484         }
 485 
 486         t = &ts;
 487         bzero(t, sizeof (*t));
 488         q->q_ptr = (void *)t;
 489         WR(q)->q_ptr = (void *)t;
 490 
 491         /*
 492          * Allocate the required messages upfront.
 493          */
 494         if ((bp = allocb_cred(sizeof (struct T_info_req) +
 495             sizeof (struct T_info_ack), crp, curproc->p_pid)) == NULL) {
 496                 return (ENOBUFS);
 497         }
 498 
 499         mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL);
 500         cv_init(&t->wait, NULL, CV_DEFAULT, NULL);
 501 
 502         t->ops = &tmpops;
 503 
 504         qprocson(q);
 505         bp->b_datap->db_type = M_PCPROTO;
 506         *(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ;
 507         bp->b_wptr += sizeof (struct T_info_req);
 508         putnext(WR(q), bp);
 509 
 510         mutex_enter(&t->lock);
 511         while (t->info_ack == NULL) {
 512                 if (cv_wait_sig(&t->wait, &t->lock) == 0) {
 513                         error = EINTR;
 514                         break;
 515                 }
 516         }
 517         mutex_exit(&t->lock);
 518 
 519         if (error)
 520                 goto out;
 521 
 522         pptr = (struct T_info_ack *)t->info_ack->b_rptr;
 523 
 524         if (pptr->SERV_type == T_CLTS) {
 525                 if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0)
 526                         ((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops;
 527         } else {
 528                 if ((error = mir_open(q, devp, flag, sflag, crp)) == 0)
 529                         ((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops;
 530         }
 531 
 532 out:
 533         if (error)
 534                 qprocsoff(q);
 535 
 536         freemsg(t->info_ack);
 537         mutex_destroy(&t->lock);
 538         cv_destroy(&t->wait);
 539 
 540         return (error);
 541 }
 542 
 543 void
 544 rmm_rput(queue_t *q, mblk_t  *mp)
 545 {
 546         (*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp);
 547 }
 548 
 549 void
 550 rmm_rsrv(queue_t *q)
 551 {
 552         (*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q);
 553 }
 554 
 555 void
 556 rmm_wput(queue_t *q, mblk_t *mp)
 557 {
 558         (*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp);
 559 }
 560 
 561 void
 562 rmm_wsrv(queue_t *q)
 563 {
 564         (*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q);
 565 }
 566 
 567 int
 568 rmm_close(queue_t *q, int flag, cred_t *crp)
 569 {
 570         return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
 571 }
 572 
 573 static void rpcmod_release(queue_t *, mblk_t *, bool_t);
 574 /*
 575  * rpcmodopen - open routine gets called when the module gets pushed
 576  *              onto the stream.
 577  */
 578 /*ARGSUSED*/
 579 int
 580 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
 581 {
 582         struct rpcm *rmp;
 583 
 584         extern void (*rpc_rele)(queue_t *, mblk_t *, bool_t);
 585 
 586         TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
 587 
 588         /*
 589          * Initialize entry points to release a rpcmod slot (and an input
 590          * message if supplied) and to send an output message to the module
 591          * below rpcmod.
 592          */
 593         if (rpc_rele == NULL)
 594                 rpc_rele = rpcmod_release;
 595 
 596         /*
 597          * Only sufficiently privileged users can use this module, and it
 598          * is assumed that they will use this module properly, and NOT send
 599          * bulk data from downstream.
 600          */
 601         if (secpolicy_rpcmod_open(crp) != 0)
 602                 return (EPERM);
 603 
 604         /*
 605          * Allocate slot data structure.
 606          */
 607         rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP);
 608 
 609         mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL);
 610         cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL);
 611         rmp->rm_zoneid = rpc_zoneid();
 612         /*
 613          * slot type will be set by kRPC client and server ioctl's
 614          */
 615         rmp->rm_type = 0;
 616 
 617         q->q_ptr = (void *)rmp;
 618         WR(q)->q_ptr = (void *)rmp;
 619 
 620         TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end");
 621         return (0);
 622 }
 623 
 624 /*
 625  * rpcmodclose - This routine gets called when the module gets popped
 626  * off of the stream.
 627  */
 628 /*ARGSUSED*/
 629 int
 630 rpcmodclose(queue_t *q, int flag, cred_t *crp)
 631 {
 632         struct rpcm *rmp;
 633 
 634         ASSERT(q != NULL);
 635         rmp = (struct rpcm *)q->q_ptr;
 636 
 637         /*
 638          * Mark our state as closing.
 639          */
 640         mutex_enter(&rmp->rm_lock);
 641         rmp->rm_state |= RM_CLOSING;
 642 
 643         /*
 644          * Check and see if there are any messages on the queue.  If so, send
 645          * the messages, regardless whether the downstream module is ready to
 646          * accept data.
 647          */
 648         if (rmp->rm_type == RPC_SERVER) {
 649                 flushq(q, FLUSHDATA);
 650 
 651                 qenable(WR(q));
 652 
 653                 if (rmp->rm_ref) {
 654                         mutex_exit(&rmp->rm_lock);
 655                         /*
 656                          * call into SVC to clean the queue
 657                          */
 658                         svc_queueclean(q);
 659                         mutex_enter(&rmp->rm_lock);
 660 
 661                         /*
 662                          * Block while there are kRPC threads with a reference
 663                          * to this message.
 664                          */
 665                         while (rmp->rm_ref)
 666                                 cv_wait(&rmp->rm_cwait, &rmp->rm_lock);
 667                 }
 668 
 669                 mutex_exit(&rmp->rm_lock);
 670 
 671                 /*
 672                  * It is now safe to remove this queue from the stream. No kRPC
 673                  * threads have a reference to the stream, and none ever will,
 674                  * because RM_CLOSING is set.
 675                  */
 676                 qprocsoff(q);
 677 
 678                 /* Notify kRPC that this stream is going away. */
 679                 svc_queueclose(q);
 680         } else {
 681                 mutex_exit(&rmp->rm_lock);
 682                 qprocsoff(q);
 683         }
 684 
 685         q->q_ptr = NULL;
 686         WR(q)->q_ptr = NULL;
 687         mutex_destroy(&rmp->rm_lock);
 688         cv_destroy(&rmp->rm_cwait);
 689         kmem_free(rmp, sizeof (*rmp));
 690         return (0);
 691 }
 692 
 693 /*
 694  * rpcmodrput - Module read put procedure.  This is called from
 695  *              the module, driver, or stream head downstream.
 696  */
 697 void
 698 rpcmodrput(queue_t *q, mblk_t *mp)
 699 {
 700         struct rpcm *rmp;
 701         union T_primitives *pptr;
 702         int hdrsz;
 703 
 704         TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:");
 705 
 706         ASSERT(q != NULL);
 707         rmp = (struct rpcm *)q->q_ptr;
 708 
 709         if (rmp->rm_type == 0) {
 710                 freemsg(mp);
 711                 return;
 712         }
 713 
 714         switch (mp->b_datap->db_type) {
 715         default:
 716                 putnext(q, mp);
 717                 break;
 718 
 719         case M_PROTO:
 720         case M_PCPROTO:
 721                 ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t));
 722                 pptr = (union T_primitives *)mp->b_rptr;
 723 
 724                 /*
 725                  * Forward this message to kRPC if it is data.
 726                  */
 727                 if (pptr->type == T_UNITDATA_IND) {
 728                         /*
 729                          * Check if the module is being popped.
 730                          */
 731                         mutex_enter(&rmp->rm_lock);
 732                         if (rmp->rm_state & RM_CLOSING) {
 733                                 mutex_exit(&rmp->rm_lock);
 734                                 putnext(q, mp);
 735                                 break;
 736                         }
 737 
 738                         switch (rmp->rm_type) {
 739                         case RPC_CLIENT:
 740                                 mutex_exit(&rmp->rm_lock);
 741                                 hdrsz = mp->b_wptr - mp->b_rptr;
 742 
 743                                 /*
 744                                  * Make sure the header is sane.
 745                                  */
 746                                 if (hdrsz < TUNITDATAINDSZ ||
 747                                     hdrsz < (pptr->unitdata_ind.OPT_length +
 748                                     pptr->unitdata_ind.OPT_offset) ||
 749                                     hdrsz < (pptr->unitdata_ind.SRC_length +
 750                                     pptr->unitdata_ind.SRC_offset)) {
 751                                         freemsg(mp);
 752                                         return;
 753                                 }
 754 
 755                                 /*
 756                                  * Call clnt_clts_dispatch_notify, so that it
 757                                  * can pass the message to the proper caller.
 758                                  * Don't discard the header just yet since the
 759                                  * client may need the sender's address.
 760                                  */
 761                                 clnt_clts_dispatch_notify(mp, hdrsz,
 762                                     rmp->rm_zoneid);
 763                                 return;
 764                         case RPC_SERVER:
 765                                 /*
 766                                  * rm_krpc_cell is exclusively used by the kRPC
 767                                  * CLTS server. Try to submit the message to
 768                                  * kRPC. Since this is an unreliable channel, we
 769                                  * can just free the message in case the kRPC
 770                                  * does not accept new messages.
 771                                  */
 772                                 if (rmp->rm_krpc_cell &&
 773                                     svc_queuereq(q, mp, TRUE)) {
 774                                         /*
 775                                          * Raise the reference count on this
 776                                          * module to prevent it from being
 777                                          * popped before kRPC generates the
 778                                          * reply.
 779                                          */
 780                                         rmp->rm_ref++;
 781                                         mutex_exit(&rmp->rm_lock);
 782                                 } else {
 783                                         mutex_exit(&rmp->rm_lock);
 784                                         freemsg(mp);
 785                                 }
 786                                 return;
 787                         default:
 788                                 mutex_exit(&rmp->rm_lock);
 789                                 freemsg(mp);
 790                                 return;
 791                         } /* end switch(rmp->rm_type) */
 792                 } else if (pptr->type == T_UDERROR_IND) {
 793                         mutex_enter(&rmp->rm_lock);
 794                         hdrsz = mp->b_wptr - mp->b_rptr;
 795 
 796                         /*
 797                          * Make sure the header is sane
 798                          */
 799                         if (hdrsz < TUDERRORINDSZ ||
 800                             hdrsz < (pptr->uderror_ind.OPT_length +
 801                             pptr->uderror_ind.OPT_offset) ||
 802                             hdrsz < (pptr->uderror_ind.DEST_length +
 803                             pptr->uderror_ind.DEST_offset)) {
 804                                 mutex_exit(&rmp->rm_lock);
 805                                 freemsg(mp);
 806                                 return;
 807                         }
 808 
 809                         /*
 810                          * In the case where a unit data error has been
 811                          * received, all we need to do is clear the message from
 812                          * the queue.
 813                          */
 814                         mutex_exit(&rmp->rm_lock);
 815                         freemsg(mp);
 816                         RPCLOG(32, "rpcmodrput: unitdata error received at "
 817                             "%ld\n", gethrestime_sec());
 818                         return;
 819                 } /* end else if (pptr->type == T_UDERROR_IND) */
 820 
 821                 putnext(q, mp);
 822                 break;
 823         } /* end switch (mp->b_datap->db_type) */
 824 
 825         TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
 826             "rpcmodrput_end:");
 827         /*
 828          * Return codes are not looked at by the STREAMS framework.
 829          */
 830 }
 831 
 832 /*
 833  * write put procedure
 834  */
 835 void
 836 rpcmodwput(queue_t *q, mblk_t *mp)
 837 {
 838         struct rpcm     *rmp;
 839 
 840         ASSERT(q != NULL);
 841 
 842         switch (mp->b_datap->db_type) {
 843                 case M_PROTO:
 844                 case M_PCPROTO:
 845                         break;
 846                 default:
 847                         rpcmodwput_other(q, mp);
 848                         return;
 849         }
 850 
 851         /*
 852          * Check to see if we can send the message downstream.
 853          */
 854         if (canputnext(q)) {
 855                 putnext(q, mp);
 856                 return;
 857         }
 858 
 859         rmp = (struct rpcm *)q->q_ptr;
 860         ASSERT(rmp != NULL);
 861 
 862         /*
 863          * The first canputnext failed.  Try again except this time with the
 864          * lock held, so that we can check the state of the stream to see if
 865          * it is closing.  If either of these conditions evaluate to true
 866          * then send the meesage.
 867          */
 868         mutex_enter(&rmp->rm_lock);
 869         if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) {
 870                 mutex_exit(&rmp->rm_lock);
 871                 putnext(q, mp);
 872         } else {
 873                 /*
 874                  * canputnext failed again and the stream is not closing.
 875                  * Place the message on the queue and let the service
 876                  * procedure handle the message.
 877                  */
 878                 mutex_exit(&rmp->rm_lock);
 879                 (void) putq(q, mp);
 880         }
 881 }
 882 
 883 static void
 884 rpcmodwput_other(queue_t *q, mblk_t *mp)
 885 {
 886         struct rpcm     *rmp;
 887         struct iocblk   *iocp;
 888 
 889         rmp = (struct rpcm *)q->q_ptr;
 890         ASSERT(rmp != NULL);
 891 
 892         switch (mp->b_datap->db_type) {
 893                 case M_IOCTL:
 894                         iocp = (struct iocblk *)mp->b_rptr;
 895                         ASSERT(iocp != NULL);
 896                         switch (iocp->ioc_cmd) {
 897                                 case RPC_CLIENT:
 898                                 case RPC_SERVER:
 899                                         mutex_enter(&rmp->rm_lock);
 900                                         rmp->rm_type = iocp->ioc_cmd;
 901                                         mutex_exit(&rmp->rm_lock);
 902                                         mp->b_datap->db_type = M_IOCACK;
 903                                         qreply(q, mp);
 904                                         return;
 905                                 default:
 906                                 /*
 907                                  * pass the ioctl downstream and hope someone
 908                                  * down there knows how to handle it.
 909                                  */
 910                                         putnext(q, mp);
 911                                         return;
 912                         }
 913                 default:
 914                         break;
 915         }
 916         /*
 917          * This is something we definitely do not know how to handle, just
 918          * pass the message downstream
 919          */
 920         putnext(q, mp);
 921 }
 922 
 923 /*
 924  * Module write service procedure. This is called by downstream modules
 925  * for back enabling during flow control.
 926  */
 927 void
 928 rpcmodwsrv(queue_t *q)
 929 {
 930         struct rpcm     *rmp;
 931         mblk_t          *mp = NULL;
 932 
 933         rmp = (struct rpcm *)q->q_ptr;
 934         ASSERT(rmp != NULL);
 935 
 936         /*
 937          * Get messages that may be queued and send them down stream
 938          */
 939         while ((mp = getq(q)) != NULL) {
 940                 /*
 941                  * Optimize the service procedure for the server-side, by
 942                  * avoiding a call to canputnext().
 943                  */
 944                 if (rmp->rm_type == RPC_SERVER || canputnext(q)) {
 945                         putnext(q, mp);
 946                         continue;
 947                 }
 948                 (void) putbq(q, mp);
 949                 return;
 950         }
 951 }
 952 
 953 /* ARGSUSED */
 954 static void
 955 rpcmod_release(queue_t *q, mblk_t *bp, bool_t enable)
 956 {
 957         struct rpcm *rmp;
 958 
 959         /*
 960          * For now, just free the message.
 961          */
 962         if (bp)
 963                 freemsg(bp);
 964         rmp = (struct rpcm *)q->q_ptr;
 965 
 966         mutex_enter(&rmp->rm_lock);
 967         rmp->rm_ref--;
 968 
 969         if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) {
 970                 cv_broadcast(&rmp->rm_cwait);
 971         }
 972 
 973         mutex_exit(&rmp->rm_lock);
 974 }
 975 
 976 /*
 977  * This part of rpcmod is pushed on a connection-oriented transport for use
 978  * by RPC.  It serves to bypass the Stream head, implements
 979  * the record marking protocol, and dispatches incoming RPC messages.
 980  */
 981 
 982 /* Default idle timer values */
 983 #define MIR_CLNT_IDLE_TIMEOUT   (5 * (60 * 1000L))      /* 5 minutes */
 984 #define MIR_SVC_IDLE_TIMEOUT    (6 * (60 * 1000L))      /* 6 minutes */
 985 #define MIR_SVC_ORDREL_TIMEOUT  (10 * (60 * 1000L))     /* 10 minutes */
 986 #define MIR_LASTFRAG    0x80000000      /* Record marker */
 987 
 988 #define MIR_SVC_QUIESCED(mir)   \
 989         (mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0)
 990 
 991 #define MIR_CLEAR_INRSRV(mir_ptr)       {       \
 992         (mir_ptr)->mir_inrservice = 0;       \
 993         if ((mir_ptr)->mir_type == RPC_SERVER &&     \
 994                 (mir_ptr)->mir_closing)      \
 995                 cv_signal(&(mir_ptr)->mir_condvar);      \
 996 }
 997 
 998 /*
 999  * Don't block service procedure (and mir_close) if
1000  * we are in the process of closing.
1001  */
1002 #define MIR_WCANPUTNEXT(mir_ptr, write_q)       \
1003         (canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1))
1004 
1005 static int      mir_clnt_dup_request(queue_t *q, mblk_t *mp);
1006 static void     mir_rput_proto(queue_t *q, mblk_t *mp);
1007 static int      mir_svc_policy_notify(queue_t *q, int event);
1008 static void     mir_svc_release(queue_t *wq, mblk_t *mp, bool_t);
1009 static void     mir_svc_start(queue_t *wq);
1010 static void     mir_svc_idle_start(queue_t *, mir_t *);
1011 static void     mir_svc_idle_stop(queue_t *, mir_t *);
1012 static void     mir_svc_start_close(queue_t *, mir_t *);
1013 static void     mir_clnt_idle_do_stop(queue_t *);
1014 static void     mir_clnt_idle_stop(queue_t *, mir_t *);
1015 static void     mir_clnt_idle_start(queue_t *, mir_t *);
1016 static void     mir_wput(queue_t *q, mblk_t *mp);
1017 static void     mir_wput_other(queue_t *q, mblk_t *mp);
1018 static void     mir_wsrv(queue_t *q);
1019 static  void    mir_disconnect(queue_t *, mir_t *ir);
1020 static  int     mir_check_len(queue_t *, mblk_t *);
1021 static  void    mir_timer(void *);
1022 
1023 extern void     (*mir_rele)(queue_t *, mblk_t *, bool_t);
1024 extern void     (*mir_start)(queue_t *);
1025 extern void     (*clnt_stop_idle)(queue_t *);
1026 
1027 clock_t clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT;
1028 clock_t svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT;
1029 
1030 /*
1031  * Timeout for subsequent notifications of idle connection.  This is
1032  * typically used to clean up after a wedged orderly release.
1033  */
1034 clock_t svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */
1035 
1036 extern  uint_t  *clnt_max_msg_sizep;
1037 extern  uint_t  *svc_max_msg_sizep;
1038 uint_t  clnt_max_msg_size = RPC_MAXDATASIZE;
1039 uint_t  svc_max_msg_size = RPC_MAXDATASIZE;
1040 uint_t  mir_krpc_cell_null;
1041 
1042 static void
1043 mir_timer_stop(mir_t *mir)
1044 {
1045         timeout_id_t tid;
1046 
1047         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1048 
1049         /*
1050          * Since the mir_mutex lock needs to be released to call
1051          * untimeout(), we need to make sure that no other thread
1052          * can start/stop the timer (changing mir_timer_id) during
1053          * that time.  The mir_timer_call bit and the mir_timer_cv
1054          * condition variable are used to synchronize this.  Setting
1055          * mir_timer_call also tells mir_timer() (refer to the comments
1056          * in mir_timer()) that it does not need to do anything.
1057          */
1058         while (mir->mir_timer_call)
1059                 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1060         mir->mir_timer_call = B_TRUE;
1061 
1062         if ((tid = mir->mir_timer_id) != 0) {
1063                 mir->mir_timer_id = 0;
1064                 mutex_exit(&mir->mir_mutex);
1065                 (void) untimeout(tid);
1066                 mutex_enter(&mir->mir_mutex);
1067         }
1068         mir->mir_timer_call = B_FALSE;
1069         cv_broadcast(&mir->mir_timer_cv);
1070 }
1071 
1072 static void
1073 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl)
1074 {
1075         timeout_id_t tid;
1076 
1077         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1078 
1079         while (mir->mir_timer_call)
1080                 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1081         mir->mir_timer_call = B_TRUE;
1082 
1083         if ((tid = mir->mir_timer_id) != 0) {
1084                 mutex_exit(&mir->mir_mutex);
1085                 (void) untimeout(tid);
1086                 mutex_enter(&mir->mir_mutex);
1087         }
1088         /* Only start the timer when it is not closing. */
1089         if (!mir->mir_closing) {
1090                 mir->mir_timer_id = timeout(mir_timer, q,
1091                     MSEC_TO_TICK(intrvl));
1092         }
1093         mir->mir_timer_call = B_FALSE;
1094         cv_broadcast(&mir->mir_timer_cv);
1095 }
1096 
1097 static int
1098 mir_clnt_dup_request(queue_t *q, mblk_t *mp)
1099 {
1100         mblk_t  *mp1;
1101         uint32_t  new_xid;
1102         uint32_t  old_xid;
1103 
1104         ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex));
1105         new_xid = BE32_TO_U32(&mp->b_rptr[4]);
1106         /*
1107          * This loop is a bit tacky -- it walks the STREAMS list of
1108          * flow-controlled messages.
1109          */
1110         if ((mp1 = q->q_first) != NULL) {
1111                 do {
1112                         old_xid = BE32_TO_U32(&mp1->b_rptr[4]);
1113                         if (new_xid == old_xid)
1114                                 return (1);
1115                 } while ((mp1 = mp1->b_next) != NULL);
1116         }
1117         return (0);
1118 }
1119 
1120 static int
1121 mir_close(queue_t *q)
1122 {
1123         mir_t   *mir = q->q_ptr;
1124         mblk_t  *mp;
1125         bool_t queue_cleaned = FALSE;
1126 
1127         RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q);
1128         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1129         mutex_enter(&mir->mir_mutex);
1130         if ((mp = mir->mir_head_mp) != NULL) {
1131                 mir->mir_head_mp = NULL;
1132                 mir->mir_tail_mp = NULL;
1133                 freemsg(mp);
1134         }
1135         /*
1136          * Set mir_closing so we get notified when MIR_SVC_QUIESCED()
1137          * is TRUE.  And mir_timer_start() won't start the timer again.
1138          */
1139         mir->mir_closing = B_TRUE;
1140         mir_timer_stop(mir);
1141 
1142         if (mir->mir_type == RPC_SERVER) {
1143                 flushq(q, FLUSHDATA);   /* Ditch anything waiting on read q */
1144 
1145                 /*
1146                  * This will prevent more requests from arriving and
1147                  * will force rpcmod to ignore flow control.
1148                  */
1149                 mir_svc_start_close(WR(q), mir);
1150 
1151                 while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) {
1152 
1153                         if (mir->mir_ref_cnt && !mir->mir_inrservice &&
1154                             (queue_cleaned == FALSE)) {
1155                                 /*
1156                                  * call into SVC to clean the queue
1157                                  */
1158                                 mutex_exit(&mir->mir_mutex);
1159                                 svc_queueclean(q);
1160                                 queue_cleaned = TRUE;
1161                                 mutex_enter(&mir->mir_mutex);
1162                                 continue;
1163                         }
1164 
1165                         /*
1166                          * Bugid 1253810 - Force the write service
1167                          * procedure to send its messages, regardless
1168                          * whether the downstream  module is ready
1169                          * to accept data.
1170                          */
1171                         if (mir->mir_inwservice == 1)
1172                                 qenable(WR(q));
1173 
1174                         cv_wait(&mir->mir_condvar, &mir->mir_mutex);
1175                 }
1176 
1177                 mutex_exit(&mir->mir_mutex);
1178                 qprocsoff(q);
1179 
1180                 /* Notify kRPC that this stream is going away. */
1181                 svc_queueclose(q);
1182         } else {
1183                 mutex_exit(&mir->mir_mutex);
1184                 qprocsoff(q);
1185         }
1186 
1187         mutex_destroy(&mir->mir_mutex);
1188         cv_destroy(&mir->mir_condvar);
1189         cv_destroy(&mir->mir_timer_cv);
1190         kmem_free(mir, sizeof (mir_t));
1191         return (0);
1192 }
1193 
1194 /*
1195  * This is server side only (RPC_SERVER).
1196  *
1197  * Exit idle mode.
1198  */
1199 static void
1200 mir_svc_idle_stop(queue_t *q, mir_t *mir)
1201 {
1202         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1203         ASSERT((q->q_flag & QREADR) == 0);
1204         ASSERT(mir->mir_type == RPC_SERVER);
1205         RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q);
1206 
1207         mir_timer_stop(mir);
1208 }
1209 
1210 /*
1211  * This is server side only (RPC_SERVER).
1212  *
1213  * Start idle processing, which will include setting idle timer if the
1214  * stream is not being closed.
1215  */
1216 static void
1217 mir_svc_idle_start(queue_t *q, mir_t *mir)
1218 {
1219         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1220         ASSERT((q->q_flag & QREADR) == 0);
1221         ASSERT(mir->mir_type == RPC_SERVER);
1222         RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q);
1223 
1224         /*
1225          * Don't re-start idle timer if we are closing queues.
1226          */
1227         if (mir->mir_closing) {
1228                 RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n",
1229                     (void *)q);
1230 
1231                 /*
1232                  * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED()
1233                  * is true.  When it is true, and we are in the process of
1234                  * closing the stream, signal any thread waiting in
1235                  * mir_close().
1236                  */
1237                 if (mir->mir_inwservice == 0)
1238                         cv_signal(&mir->mir_condvar);
1239 
1240         } else {
1241                 RPCLOG(16, "mir_svc_idle_start - reset %s timer\n",
1242                     mir->mir_ordrel_pending ? "ordrel" : "normal");
1243                 /*
1244                  * Normal condition, start the idle timer.  If an orderly
1245                  * release has been sent, set the timeout to wait for the
1246                  * client to close its side of the connection.  Otherwise,
1247                  * use the normal idle timeout.
1248                  */
1249                 mir_timer_start(q, mir, mir->mir_ordrel_pending ?
1250                     svc_ordrel_timeout : mir->mir_idle_timeout);
1251         }
1252 }
1253 
1254 /* ARGSUSED */
1255 static int
1256 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
1257 {
1258         mir_t   *mir;
1259 
1260         RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
1261         /* Set variables used directly by kRPC. */
1262         if (!mir_rele)
1263                 mir_rele = mir_svc_release;
1264         if (!mir_start)
1265                 mir_start = mir_svc_start;
1266         if (!clnt_stop_idle)
1267                 clnt_stop_idle = mir_clnt_idle_do_stop;
1268         if (!clnt_max_msg_sizep)
1269                 clnt_max_msg_sizep = &clnt_max_msg_size;
1270         if (!svc_max_msg_sizep)
1271                 svc_max_msg_sizep = &svc_max_msg_size;
1272 
1273         /* Allocate a zero'ed out mir structure for this stream. */
1274         mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP);
1275 
1276         /*
1277          * We set hold inbound here so that incoming messages will
1278          * be held on the read-side queue until the stream is completely
1279          * initialized with a RPC_CLIENT or RPC_SERVER ioctl.  During
1280          * the ioctl processing, the flag is cleared and any messages that
1281          * arrived between the open and the ioctl are delivered to kRPC.
1282          *
1283          * Early data should never arrive on a client stream since
1284          * servers only respond to our requests and we do not send any.
1285          * until after the stream is initialized.  Early data is
1286          * very common on a server stream where the client will start
1287          * sending data as soon as the connection is made (and this
1288          * is especially true with TCP where the protocol accepts the
1289          * connection before nfsd or kRPC is notified about it).
1290          */
1291 
1292         mir->mir_hold_inbound = 1;
1293 
1294         /*
1295          * Start the record marker looking for a 4-byte header.  When
1296          * this length is negative, it indicates that rpcmod is looking
1297          * for bytes to consume for the record marker header.  When it
1298          * is positive, it holds the number of bytes that have arrived
1299          * for the current fragment and are being held in mir_header_mp.
1300          */
1301 
1302         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1303 
1304         mir->mir_zoneid = rpc_zoneid();
1305         mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL);
1306         cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL);
1307         cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL);
1308 
1309         q->q_ptr = (char *)mir;
1310         WR(q)->q_ptr = (char *)mir;
1311 
1312         /*
1313          * We noenable the read-side queue because we don't want it
1314          * automatically enabled by putq.  We enable it explicitly
1315          * in mir_wsrv when appropriate. (See additional comments on
1316          * flow control at the beginning of mir_rsrv.)
1317          */
1318         noenable(q);
1319 
1320         qprocson(q);
1321         return (0);
1322 }
1323 
1324 /*
1325  * Read-side put routine for both the client and server side.  Does the
1326  * record marking for incoming RPC messages, and when complete, dispatches
1327  * the message to either the client or server.
1328  */
1329 static void
1330 mir_rput(queue_t *q, mblk_t *mp)
1331 {
1332         int     excess;
1333         int32_t frag_len, frag_header;
1334         mblk_t  *cont_mp, *head_mp, *tail_mp, *mp1;
1335         mir_t   *mir = q->q_ptr;
1336         boolean_t stop_timer = B_FALSE;
1337 
1338         ASSERT(mir != NULL);
1339 
1340         /*
1341          * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
1342          * with the corresponding ioctl, then don't accept
1343          * any inbound data.  This should never happen for streams
1344          * created by nfsd or client-side kRPC because they are careful
1345          * to set the mode of the stream before doing anything else.
1346          */
1347         if (mir->mir_type == 0) {
1348                 freemsg(mp);
1349                 return;
1350         }
1351 
1352         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1353 
1354         switch (mp->b_datap->db_type) {
1355         case M_DATA:
1356                 break;
1357         case M_PROTO:
1358         case M_PCPROTO:
1359                 if (MBLKL(mp) < sizeof (t_scalar_t)) {
1360                         RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n",
1361                             (int)MBLKL(mp));
1362                         freemsg(mp);
1363                         return;
1364                 }
1365                 if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) {
1366                         mir_rput_proto(q, mp);
1367                         return;
1368                 }
1369 
1370                 /* Throw away the T_DATA_IND block and continue with data. */
1371                 mp1 = mp;
1372                 mp = mp->b_cont;
1373                 freeb(mp1);
1374                 break;
1375         case M_SETOPTS:
1376                 /*
1377                  * If a module on the stream is trying set the Stream head's
1378                  * high water mark, then set our hiwater to the requested
1379                  * value.  We are the "stream head" for all inbound
1380                  * data messages since messages are passed directly to kRPC.
1381                  */
1382                 if (MBLKL(mp) >= sizeof (struct stroptions)) {
1383                         struct stroptions       *stropts;
1384 
1385                         stropts = (struct stroptions *)mp->b_rptr;
1386                         if ((stropts->so_flags & SO_HIWAT) &&
1387                             !(stropts->so_flags & SO_BAND)) {
1388                                 (void) strqset(q, QHIWAT, 0, stropts->so_hiwat);
1389                         }
1390                 }
1391                 putnext(q, mp);
1392                 return;
1393         case M_FLUSH:
1394                 RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr);
1395                 RPCLOG(32, "on q 0x%p\n", (void *)q);
1396                 putnext(q, mp);
1397                 return;
1398         default:
1399                 putnext(q, mp);
1400                 return;
1401         }
1402 
1403         mutex_enter(&mir->mir_mutex);
1404 
1405         /*
1406          * If this connection is closing, don't accept any new messages.
1407          */
1408         if (mir->mir_svc_no_more_msgs) {
1409                 ASSERT(mir->mir_type == RPC_SERVER);
1410                 mutex_exit(&mir->mir_mutex);
1411                 freemsg(mp);
1412                 return;
1413         }
1414 
1415         /* Get local copies for quicker access. */
1416         frag_len = mir->mir_frag_len;
1417         frag_header = mir->mir_frag_header;
1418         head_mp = mir->mir_head_mp;
1419         tail_mp = mir->mir_tail_mp;
1420 
1421         /* Loop, processing each message block in the mp chain separately. */
1422         do {
1423                 cont_mp = mp->b_cont;
1424                 mp->b_cont = NULL;
1425 
1426                 /*
1427                  * Drop zero-length mblks to prevent unbounded kernel memory
1428                  * consumption.
1429                  */
1430                 if (MBLKL(mp) == 0) {
1431                         freeb(mp);
1432                         continue;
1433                 }
1434 
1435                 /*
1436                  * If frag_len is negative, we're still in the process of
1437                  * building frag_header -- try to complete it with this mblk.
1438                  */
1439                 while (frag_len < 0 && mp->b_rptr < mp->b_wptr) {
1440                         frag_len++;
1441                         frag_header <<= 8;
1442                         frag_header += *mp->b_rptr++;
1443                 }
1444 
1445                 if (MBLKL(mp) == 0 && frag_len < 0) {
1446                         /*
1447                          * We consumed this mblk while trying to complete the
1448                          * fragment header.  Free it and move on.
1449                          */
1450                         freeb(mp);
1451                         continue;
1452                 }
1453 
1454                 ASSERT(frag_len >= 0);
1455 
1456                 /*
1457                  * Now frag_header has the number of bytes in this fragment
1458                  * and we're just waiting to collect them all.  Chain our
1459                  * latest mblk onto the list and see if we now have enough
1460                  * bytes to complete the fragment.
1461                  */
1462                 if (head_mp == NULL) {
1463                         ASSERT(tail_mp == NULL);
1464                         head_mp = tail_mp = mp;
1465                 } else {
1466                         tail_mp->b_cont = mp;
1467                         tail_mp = mp;
1468                 }
1469 
1470                 frag_len += MBLKL(mp);
1471                 excess = frag_len - (frag_header & ~MIR_LASTFRAG);
1472                 if (excess < 0) {
1473                         /*
1474                          * We still haven't received enough data to complete
1475                          * the fragment, so continue on to the next mblk.
1476                          */
1477                         continue;
1478                 }
1479 
1480                 /*
1481                  * We've got a complete fragment.  If there are excess bytes,
1482                  * then they're part of the next fragment's header (of either
1483                  * this RPC message or the next RPC message).  Split that part
1484                  * into its own mblk so that we can safely freeb() it when
1485                  * building frag_header above.
1486                  */
1487                 if (excess > 0) {
1488                         if ((mp1 = dupb(mp)) == NULL &&
1489                             (mp1 = copyb(mp)) == NULL) {
1490                                 freemsg(head_mp);
1491                                 freemsg(cont_mp);
1492                                 RPCLOG0(1, "mir_rput: dupb/copyb failed\n");
1493                                 mir->mir_frag_header = 0;
1494                                 mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1495                                 mir->mir_head_mp = NULL;
1496                                 mir->mir_tail_mp = NULL;
1497                                 mir_disconnect(q, mir); /* drops mir_mutex */
1498                                 return;
1499                         }
1500 
1501                         /*
1502                          * Relink the message chain so that the next mblk is
1503                          * the next fragment header, followed by the rest of
1504                          * the message chain.
1505                          */
1506                         mp1->b_cont = cont_mp;
1507                         cont_mp = mp1;
1508 
1509                         /*
1510                          * Data in the new mblk begins at the next fragment,
1511                          * and data in the old mblk ends at the next fragment.
1512                          */
1513                         mp1->b_rptr = mp1->b_wptr - excess;
1514                         mp->b_wptr -= excess;
1515                 }
1516 
1517                 /*
1518                  * Reset frag_len and frag_header for the next fragment.
1519                  */
1520                 frag_len = -(int32_t)sizeof (uint32_t);
1521                 if (!(frag_header & MIR_LASTFRAG)) {
1522                         /*
1523                          * The current fragment is complete, but more
1524                          * fragments need to be processed before we can
1525                          * pass along the RPC message headed at head_mp.
1526                          */
1527                         frag_header = 0;
1528                         continue;
1529                 }
1530                 frag_header = 0;
1531 
1532                 /*
1533                  * We've got a complete RPC message; pass it to the
1534                  * appropriate consumer.
1535                  */
1536                 switch (mir->mir_type) {
1537                 case RPC_CLIENT:
1538                         if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) {
1539                                 /*
1540                                  * Mark this stream as active.  This marker
1541                                  * is used in mir_timer().
1542                                  */
1543                                 mir->mir_clntreq = 1;
1544                                 mir->mir_use_timestamp = ddi_get_lbolt();
1545                         } else {
1546                                 freemsg(head_mp);
1547                         }
1548                         break;
1549 
1550                 case RPC_SERVER:
1551                         /*
1552                          * Check for flow control before passing the
1553                          * message to kRPC.
1554                          */
1555                         if (!mir->mir_hold_inbound) {
1556                                 if (mir->mir_krpc_cell) {
1557 
1558                                         if (mir_check_len(q, head_mp))
1559                                                 return;
1560 
1561                                         if (q->q_first == NULL &&
1562                                             svc_queuereq(q, head_mp, TRUE)) {
1563                                                 /*
1564                                                  * If the reference count is 0
1565                                                  * (not including this
1566                                                  * request), then the stream is
1567                                                  * transitioning from idle to
1568                                                  * non-idle.  In this case, we
1569                                                  * cancel the idle timer.
1570                                                  */
1571                                                 if (mir->mir_ref_cnt++ == 0)
1572                                                         stop_timer = B_TRUE;
1573                                         } else {
1574                                                 (void) putq(q, head_mp);
1575                                                 mir->mir_inrservice = B_TRUE;
1576                                         }
1577                                 } else {
1578                                         /*
1579                                          * Count # of times this happens. Should
1580                                          * be never, but experience shows
1581                                          * otherwise.
1582                                          */
1583                                         mir_krpc_cell_null++;
1584                                         freemsg(head_mp);
1585                                 }
1586                         } else {
1587                                 /*
1588                                  * If the outbound side of the stream is
1589                                  * flow controlled, then hold this message
1590                                  * until client catches up. mir_hold_inbound
1591                                  * is set in mir_wput and cleared in mir_wsrv.
1592                                  */
1593                                 (void) putq(q, head_mp);
1594                                 mir->mir_inrservice = B_TRUE;
1595                         }
1596                         break;
1597                 default:
1598                         RPCLOG(1, "mir_rput: unknown mir_type %d\n",
1599                             mir->mir_type);
1600                         freemsg(head_mp);
1601                         break;
1602                 }
1603 
1604                 /*
1605                  * Reset the chain since we're starting on a new RPC message.
1606                  */
1607                 head_mp = tail_mp = NULL;
1608         } while ((mp = cont_mp) != NULL);
1609 
1610         /*
1611          * Sanity check the message length; if it's too large mir_check_len()
1612          * will shutdown the connection, drop mir_mutex, and return non-zero.
1613          */
1614         if (head_mp != NULL && mir->mir_setup_complete &&
1615             mir_check_len(q, head_mp))
1616                 return;
1617 
1618         /* Save our local copies back in the mir structure. */
1619         mir->mir_frag_header = frag_header;
1620         mir->mir_frag_len = frag_len;
1621         mir->mir_head_mp = head_mp;
1622         mir->mir_tail_mp = tail_mp;
1623 
1624         /*
1625          * The timer is stopped after the whole message chain is processed.
1626          * The reason is that stopping the timer releases the mir_mutex
1627          * lock temporarily.  This means that the request can be serviced
1628          * while we are still processing the message chain.  This is not
1629          * good.  So we stop the timer here instead.
1630          *
1631          * Note that if the timer fires before we stop it, it will not
1632          * do any harm as MIR_SVC_QUIESCED() is false and mir_timer()
1633          * will just return.
1634          */
1635         if (stop_timer) {
1636                 RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because "
1637                     "ref cnt going to non zero\n", (void *)WR(q));
1638                 mir_svc_idle_stop(WR(q), mir);
1639         }
1640         mutex_exit(&mir->mir_mutex);
1641 }
1642 
1643 static void
1644 mir_rput_proto(queue_t *q, mblk_t *mp)
1645 {
1646         mir_t   *mir = (mir_t *)q->q_ptr;
1647         uint32_t        type;
1648         uint32_t reason = 0;
1649 
1650         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1651 
1652         type = ((union T_primitives *)mp->b_rptr)->type;
1653         switch (mir->mir_type) {
1654         case RPC_CLIENT:
1655                 switch (type) {
1656                 case T_DISCON_IND:
1657                         reason = ((struct T_discon_ind *)
1658                             (mp->b_rptr))->DISCON_reason;
1659                         /*FALLTHROUGH*/
1660                 case T_ORDREL_IND:
1661                         mutex_enter(&mir->mir_mutex);
1662                         if (mir->mir_head_mp) {
1663                                 freemsg(mir->mir_head_mp);
1664                                 mir->mir_head_mp = (mblk_t *)0;
1665                                 mir->mir_tail_mp = (mblk_t *)0;
1666                         }
1667                         /*
1668                          * We are disconnecting, but not necessarily
1669                          * closing. By not closing, we will fail to
1670                          * pick up a possibly changed global timeout value,
1671                          * unless we store it now.
1672                          */
1673                         mir->mir_idle_timeout = clnt_idle_timeout;
1674                         mir_clnt_idle_stop(WR(q), mir);
1675 
1676                         /*
1677                          * Even though we are unconnected, we still
1678                          * leave the idle timer going on the client. The
1679                          * reason for is that if we've disconnected due
1680                          * to a server-side disconnect, reset, or connection
1681                          * timeout, there is a possibility the client may
1682                          * retry the RPC request. This retry needs to done on
1683                          * the same bound address for the server to interpret
1684                          * it as such. However, we don't want
1685                          * to wait forever for that possibility. If the
1686                          * end-point stays unconnected for mir_idle_timeout
1687                          * units of time, then that is a signal to the
1688                          * connection manager to give up waiting for the
1689                          * application (eg. NFS) to send a retry.
1690                          */
1691                         mir_clnt_idle_start(WR(q), mir);
1692                         mutex_exit(&mir->mir_mutex);
1693                         clnt_dispatch_notifyall(WR(q), type, reason);
1694                         freemsg(mp);
1695                         return;
1696                 case T_ERROR_ACK:
1697                 {
1698                         struct T_error_ack      *terror;
1699 
1700                         terror = (struct T_error_ack *)mp->b_rptr;
1701                         RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p",
1702                             (void *)q);
1703                         RPCLOG(1, " ERROR_prim: %s,",
1704                             rpc_tpiprim2name(terror->ERROR_prim));
1705                         RPCLOG(1, " TLI_error: %s,",
1706                             rpc_tpierr2name(terror->TLI_error));
1707                         RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error);
1708                         if (terror->ERROR_prim == T_DISCON_REQ)  {
1709                                 clnt_dispatch_notifyall(WR(q), type, reason);
1710                                 freemsg(mp);
1711                                 return;
1712                         } else {
1713                                 if (clnt_dispatch_notifyconn(WR(q), mp))
1714                                         return;
1715                         }
1716                         break;
1717                 }
1718                 case T_OK_ACK:
1719                 {
1720                         struct T_ok_ack *tok = (struct T_ok_ack *)mp->b_rptr;
1721 
1722                         if (tok->CORRECT_prim == T_DISCON_REQ) {
1723                                 clnt_dispatch_notifyall(WR(q), type, reason);
1724                                 freemsg(mp);
1725                                 return;
1726                         } else {
1727                                 if (clnt_dispatch_notifyconn(WR(q), mp))
1728                                         return;
1729                         }
1730                         break;
1731                 }
1732                 case T_CONN_CON:
1733                 case T_INFO_ACK:
1734                 case T_OPTMGMT_ACK:
1735                         if (clnt_dispatch_notifyconn(WR(q), mp))
1736                                 return;
1737                         break;
1738                 case T_BIND_ACK:
1739                         break;
1740                 default:
1741                         RPCLOG(1, "mir_rput: unexpected message %d "
1742                             "for kRPC client\n",
1743                             ((union T_primitives *)mp->b_rptr)->type);
1744                         break;
1745                 }
1746                 break;
1747 
1748         case RPC_SERVER:
1749                 switch (type) {
1750                 case T_BIND_ACK:
1751                 {
1752                         struct T_bind_ack       *tbind;
1753 
1754                         /*
1755                          * If this is a listening stream, then shut
1756                          * off the idle timer.
1757                          */
1758                         tbind = (struct T_bind_ack *)mp->b_rptr;
1759                         if (tbind->CONIND_number > 0) {
1760                                 mutex_enter(&mir->mir_mutex);
1761                                 mir_svc_idle_stop(WR(q), mir);
1762 
1763                                 /*
1764                                  * mark this as a listen endpoint
1765                                  * for special handling.
1766                                  */
1767 
1768                                 mir->mir_listen_stream = 1;
1769                                 mutex_exit(&mir->mir_mutex);
1770                         }
1771                         break;
1772                 }
1773                 case T_DISCON_IND:
1774                 case T_ORDREL_IND:
1775                         RPCLOG(16, "mir_rput_proto: got %s indication\n",
1776                             type == T_DISCON_IND ? "disconnect"
1777                             : "orderly release");
1778 
1779                         /*
1780                          * For listen endpoint just pass
1781                          * on the message.
1782                          */
1783 
1784                         if (mir->mir_listen_stream)
1785                                 break;
1786 
1787                         mutex_enter(&mir->mir_mutex);
1788 
1789                         /*
1790                          * If client wants to break off connection, record
1791                          * that fact.
1792                          */
1793                         mir_svc_start_close(WR(q), mir);
1794 
1795                         /*
1796                          * If we are idle, then send the orderly release
1797                          * or disconnect indication to nfsd.
1798                          */
1799                         if (MIR_SVC_QUIESCED(mir)) {
1800                                 mutex_exit(&mir->mir_mutex);
1801                                 break;
1802                         }
1803 
1804                         RPCLOG(16, "mir_rput_proto: not idle, so "
1805                             "disconnect/ord rel indication not passed "
1806                             "upstream on 0x%p\n", (void *)q);
1807 
1808                         /*
1809                          * Hold the indication until we get idle
1810                          * If there already is an indication stored,
1811                          * replace it if the new one is a disconnect. The
1812                          * reasoning is that disconnection takes less time
1813                          * to process, and once a client decides to
1814                          * disconnect, we should do that.
1815                          */
1816                         if (mir->mir_svc_pend_mp) {
1817                                 if (type == T_DISCON_IND) {
1818                                         RPCLOG(16, "mir_rput_proto: replacing"
1819                                             " held disconnect/ord rel"
1820                                             " indication with disconnect on"
1821                                             " 0x%p\n", (void *)q);
1822 
1823                                         freemsg(mir->mir_svc_pend_mp);
1824                                         mir->mir_svc_pend_mp = mp;
1825                                 } else {
1826                                         RPCLOG(16, "mir_rput_proto: already "
1827                                             "held a disconnect/ord rel "
1828                                             "indication. freeing ord rel "
1829                                             "ind on 0x%p\n", (void *)q);
1830                                         freemsg(mp);
1831                                 }
1832                         } else
1833                                 mir->mir_svc_pend_mp = mp;
1834 
1835                         mutex_exit(&mir->mir_mutex);
1836                         return;
1837 
1838                 default:
1839                         /* nfsd handles server-side non-data messages. */
1840                         break;
1841                 }
1842                 break;
1843 
1844         default:
1845                 break;
1846         }
1847 
1848         putnext(q, mp);
1849 }
1850 
1851 /*
1852  * The server-side read queues are used to hold inbound messages while
1853  * outbound flow control is exerted.  When outbound flow control is
1854  * relieved, mir_wsrv qenables the read-side queue.  Read-side queues
1855  * are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
1856  */
1857 static void
1858 mir_rsrv(queue_t *q)
1859 {
1860         mir_t   *mir;
1861         mblk_t  *mp;
1862         boolean_t stop_timer = B_FALSE;
1863 
1864         mir = (mir_t *)q->q_ptr;
1865         mutex_enter(&mir->mir_mutex);
1866 
1867         mp = NULL;
1868         switch (mir->mir_type) {
1869         case RPC_SERVER:
1870                 if (mir->mir_ref_cnt == 0)
1871                         mir->mir_hold_inbound = 0;
1872                 if (mir->mir_hold_inbound)
1873                         break;
1874 
1875                 while (mp = getq(q)) {
1876                         if (mir->mir_krpc_cell &&
1877                             (mir->mir_svc_no_more_msgs == 0)) {
1878 
1879                                 if (mir_check_len(q, mp))
1880                                         return;
1881 
1882                                 if (svc_queuereq(q, mp, TRUE)) {
1883                                         /*
1884                                          * If we were idle, turn off idle timer
1885                                          * since we aren't idle any more.
1886                                          */
1887                                         if (mir->mir_ref_cnt++ == 0)
1888                                                 stop_timer = B_TRUE;
1889                                 } else {
1890                                         (void) putbq(q, mp);
1891                                         break;
1892                                 }
1893                         } else {
1894                                 /*
1895                                  * Count # of times this happens. Should be
1896                                  * never, but experience shows otherwise.
1897                                  */
1898                                 if (mir->mir_krpc_cell == NULL)
1899                                         mir_krpc_cell_null++;
1900                                 freemsg(mp);
1901                         }
1902                 }
1903                 break;
1904         case RPC_CLIENT:
1905                 break;
1906         default:
1907                 RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type);
1908 
1909                 if (q->q_first == NULL)
1910                         MIR_CLEAR_INRSRV(mir);
1911 
1912                 mutex_exit(&mir->mir_mutex);
1913 
1914                 return;
1915         }
1916 
1917         /*
1918          * The timer is stopped after all the messages are processed.
1919          * The reason is that stopping the timer releases the mir_mutex
1920          * lock temporarily.  This means that the request can be serviced
1921          * while we are still processing the message queue.  This is not
1922          * good.  So we stop the timer here instead.
1923          */
1924         if (stop_timer)  {
1925                 RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref "
1926                     "cnt going to non zero\n", (void *)WR(q));
1927                 mir_svc_idle_stop(WR(q), mir);
1928         }
1929 
1930         if (q->q_first == NULL) {
1931                 mblk_t  *cmp = NULL;
1932 
1933                 MIR_CLEAR_INRSRV(mir);
1934 
1935                 if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
1936                         cmp = mir->mir_svc_pend_mp;
1937                         mir->mir_svc_pend_mp = NULL;
1938                 }
1939 
1940                 mutex_exit(&mir->mir_mutex);
1941 
1942                 if (cmp != NULL) {
1943                         RPCLOG(16, "mir_rsrv: line %d: sending a held "
1944                             "disconnect/ord rel indication upstream\n",
1945                             __LINE__);
1946                         putnext(q, cmp);
1947                 }
1948 
1949                 return;
1950         }
1951         mutex_exit(&mir->mir_mutex);
1952 }
1953 
1954 static int mir_svc_policy_fails;
1955 
1956 /*
1957  * Called to send an event code to nfsd/lockd so that it initiates
1958  * connection close.
1959  */
1960 static int
1961 mir_svc_policy_notify(queue_t *q, int event)
1962 {
1963         mblk_t  *mp;
1964 #ifdef DEBUG
1965         mir_t *mir = (mir_t *)q->q_ptr;
1966         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1967 #endif
1968         ASSERT(q->q_flag & QREADR);
1969 
1970         /*
1971          * Create an M_DATA message with the event code and pass it to the
1972          * Stream head (nfsd or whoever created the stream will consume it).
1973          */
1974         mp = allocb(sizeof (int), BPRI_HI);
1975 
1976         if (!mp) {
1977 
1978                 mir_svc_policy_fails++;
1979                 RPCLOG(16, "mir_svc_policy_notify: could not allocate event "
1980                     "%d\n", event);
1981                 return (ENOMEM);
1982         }
1983 
1984         U32_TO_BE32(event, mp->b_rptr);
1985         mp->b_wptr = mp->b_rptr + sizeof (int);
1986         putnext(q, mp);
1987         return (0);
1988 }
1989 
1990 /*
1991  * Server side: start the close phase. We want to get this rpcmod slot in an
1992  * idle state before mir_close() is called.
1993  */
1994 static void
1995 mir_svc_start_close(queue_t *wq, mir_t *mir)
1996 {
1997         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1998         ASSERT((wq->q_flag & QREADR) == 0);
1999         ASSERT(mir->mir_type == RPC_SERVER);
2000 
2001         /*
2002          * Do not accept any more messages.
2003          */
2004         mir->mir_svc_no_more_msgs = 1;
2005 
2006         /*
2007          * Next two statements will make the read service procedure
2008          * free everything stuck in the streams read queue.
2009          * It's not necessary because enabling the write queue will
2010          * have the same effect, but why not speed the process along?
2011          */
2012         mir->mir_hold_inbound = 0;
2013         qenable(RD(wq));
2014 
2015         /*
2016          * Meanwhile force the write service procedure to send the
2017          * responses downstream, regardless of flow control.
2018          */
2019         qenable(wq);
2020 }
2021 
2022 /*
2023  * This routine is called directly by kRPC after a request is completed,
2024  * whether a reply was sent or the request was dropped.
2025  */
2026 static void
2027 mir_svc_release(queue_t *wq, mblk_t *mp, bool_t enable)
2028 {
2029         mir_t   *mir = (mir_t *)wq->q_ptr;
2030         mblk_t  *cmp = NULL;
2031 
2032         ASSERT((wq->q_flag & QREADR) == 0);
2033         if (mp)
2034                 freemsg(mp);
2035 
2036         if (enable)
2037                 qenable(RD(wq));
2038 
2039         mutex_enter(&mir->mir_mutex);
2040 
2041         /*
2042          * Start idle processing if this is the last reference.
2043          */
2044         if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) {
2045                 cmp = mir->mir_svc_pend_mp;
2046                 mir->mir_svc_pend_mp = NULL;
2047         }
2048 
2049         if (cmp) {
2050                 RPCLOG(16, "mir_svc_release: sending a held "
2051                     "disconnect/ord rel indication upstream on queue 0x%p\n",
2052                     (void *)RD(wq));
2053 
2054                 mutex_exit(&mir->mir_mutex);
2055 
2056                 putnext(RD(wq), cmp);
2057 
2058                 mutex_enter(&mir->mir_mutex);
2059         }
2060 
2061         /*
2062          * Start idle processing if this is the last reference.
2063          */
2064         if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) {
2065 
2066                 RPCLOG(16, "mir_svc_release starting idle timer on 0x%p "
2067                     "because ref cnt is zero\n", (void *) wq);
2068 
2069                 mir_svc_idle_start(wq, mir);
2070         }
2071 
2072         mir->mir_ref_cnt--;
2073         ASSERT(mir->mir_ref_cnt >= 0);
2074 
2075         /*
2076          * Wake up the thread waiting to close.
2077          */
2078 
2079         if ((mir->mir_ref_cnt == 0) && mir->mir_closing)
2080                 cv_signal(&mir->mir_condvar);
2081 
2082         mutex_exit(&mir->mir_mutex);
2083 }
2084 
2085 /*
2086  * This routine is called by server-side kRPC when it is ready to
2087  * handle inbound messages on the stream.
2088  */
2089 static void
2090 mir_svc_start(queue_t *wq)
2091 {
2092         mir_t   *mir = (mir_t *)wq->q_ptr;
2093 
2094         /*
2095          * no longer need to take the mir_mutex because the
2096          * mir_setup_complete field has been moved out of
2097          * the binary field protected by the mir_mutex.
2098          */
2099 
2100         mir->mir_setup_complete = 1;
2101         qenable(RD(wq));
2102 }
2103 
2104 /*
2105  * client side wrapper for stopping timer with normal idle timeout.
2106  */
2107 static void
2108 mir_clnt_idle_stop(queue_t *wq, mir_t *mir)
2109 {
2110         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2111         ASSERT((wq->q_flag & QREADR) == 0);
2112         ASSERT(mir->mir_type == RPC_CLIENT);
2113 
2114         mir_timer_stop(mir);
2115 }
2116 
2117 /*
2118  * client side wrapper for stopping timer with normal idle timeout.
2119  */
2120 static void
2121 mir_clnt_idle_start(queue_t *wq, mir_t *mir)
2122 {
2123         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2124         ASSERT((wq->q_flag & QREADR) == 0);
2125         ASSERT(mir->mir_type == RPC_CLIENT);
2126 
2127         mir_timer_start(wq, mir, mir->mir_idle_timeout);
2128 }
2129 
2130 /*
2131  * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on
2132  * end-points that aren't connected.
2133  */
2134 static void
2135 mir_clnt_idle_do_stop(queue_t *wq)
2136 {
2137         mir_t   *mir = (mir_t *)wq->q_ptr;
2138 
2139         RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq);
2140         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2141         mutex_enter(&mir->mir_mutex);
2142         mir_clnt_idle_stop(wq, mir);
2143         mutex_exit(&mir->mir_mutex);
2144 }
2145 
2146 /*
2147  * Timer handler.  It handles idle timeout and memory shortage problem.
2148  */
2149 static void
2150 mir_timer(void *arg)
2151 {
2152         queue_t *wq = (queue_t *)arg;
2153         mir_t *mir = (mir_t *)wq->q_ptr;
2154         boolean_t notify;
2155         clock_t now;
2156 
2157         mutex_enter(&mir->mir_mutex);
2158 
2159         /*
2160          * mir_timer_call is set only when either mir_timer_[start|stop]
2161          * is progressing.  And mir_timer() can only be run while they
2162          * are progressing if the timer is being stopped.  So just
2163          * return.
2164          */
2165         if (mir->mir_timer_call) {
2166                 mutex_exit(&mir->mir_mutex);
2167                 return;
2168         }
2169         mir->mir_timer_id = 0;
2170 
2171         switch (mir->mir_type) {
2172         case RPC_CLIENT:
2173 
2174                 /*
2175                  * For clients, the timer fires at clnt_idle_timeout
2176                  * intervals.  If the activity marker (mir_clntreq) is
2177                  * zero, then the stream has been idle since the last
2178                  * timer event and we notify kRPC.  If mir_clntreq is
2179                  * non-zero, then the stream is active and we just
2180                  * restart the timer for another interval.  mir_clntreq
2181                  * is set to 1 in mir_wput for every request passed
2182                  * downstream.
2183                  *
2184                  * If this was a memory shortage timer reset the idle
2185                  * timeout regardless; the mir_clntreq will not be a
2186                  * valid indicator.
2187                  *
2188                  * The timer is initially started in mir_wput during
2189                  * RPC_CLIENT ioctl processing.
2190                  *
2191                  * The timer interval can be changed for individual
2192                  * streams with the ND variable "mir_idle_timeout".
2193                  */
2194                 now = ddi_get_lbolt();
2195                 if (mir->mir_clntreq > 0 && mir->mir_use_timestamp +
2196                     MSEC_TO_TICK(mir->mir_idle_timeout) - now >= 0) {
2197                         clock_t tout;
2198 
2199                         tout = mir->mir_idle_timeout -
2200                             TICK_TO_MSEC(now - mir->mir_use_timestamp);
2201                         if (tout < 0)
2202                                 tout = 1000;
2203 #if 0
2204                         printf("mir_timer[%d < %d + %d]: reset client timer "
2205                             "to %d (ms)\n", TICK_TO_MSEC(now),
2206                             TICK_TO_MSEC(mir->mir_use_timestamp),
2207                             mir->mir_idle_timeout, tout);
2208 #endif
2209                         mir->mir_clntreq = 0;
2210                         mir_timer_start(wq, mir, tout);
2211                         mutex_exit(&mir->mir_mutex);
2212                         return;
2213                 }
2214 #if 0
2215 printf("mir_timer[%d]: doing client timeout\n", now / hz);
2216 #endif
2217                 /*
2218                  * We are disconnecting, but not necessarily
2219                  * closing. By not closing, we will fail to
2220                  * pick up a possibly changed global timeout value,
2221                  * unless we store it now.
2222                  */
2223                 mir->mir_idle_timeout = clnt_idle_timeout;
2224                 mir_clnt_idle_start(wq, mir);
2225 
2226                 mutex_exit(&mir->mir_mutex);
2227                 /*
2228                  * We pass T_ORDREL_REQ as an integer value
2229                  * to kRPC as the indication that the stream
2230                  * is idle.  This is not a T_ORDREL_REQ message,
2231                  * it is just a convenient value since we call
2232                  * the same kRPC routine for T_ORDREL_INDs and
2233                  * T_DISCON_INDs.
2234                  */
2235                 clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
2236                 return;
2237 
2238         case RPC_SERVER:
2239 
2240                 /*
2241                  * For servers, the timer is only running when the stream
2242                  * is really idle or memory is short.  The timer is started
2243                  * by mir_wput when mir_type is set to RPC_SERVER and
2244                  * by mir_svc_idle_start whenever the stream goes idle
2245                  * (mir_ref_cnt == 0).  The timer is cancelled in
2246                  * mir_rput whenever a new inbound request is passed to kRPC
2247                  * and the stream was previously idle.
2248                  *
2249                  * The timer interval can be changed for individual
2250                  * streams with the ND variable "mir_idle_timeout".
2251                  *
2252                  * If the stream is not idle do nothing.
2253                  */
2254                 if (!MIR_SVC_QUIESCED(mir)) {
2255                         mutex_exit(&mir->mir_mutex);
2256                         return;
2257                 }
2258 
2259                 notify = !mir->mir_inrservice;
2260                 mutex_exit(&mir->mir_mutex);
2261 
2262                 /*
2263                  * If there is no packet queued up in read queue, the stream
2264                  * is really idle so notify nfsd to close it.
2265                  */
2266                 if (notify) {
2267                         RPCLOG(16, "mir_timer: telling stream head listener "
2268                             "to close stream (0x%p)\n", (void *) RD(wq));
2269                         (void) mir_svc_policy_notify(RD(wq), 1);
2270                 }
2271                 return;
2272         default:
2273                 RPCLOG(1, "mir_timer: unexpected mir_type %d\n",
2274                     mir->mir_type);
2275                 mutex_exit(&mir->mir_mutex);
2276                 return;
2277         }
2278 }
2279 
2280 /*
2281  * Called by the RPC package to send either a call or a return, or a
2282  * transport connection request.  Adds the record marking header.
2283  */
2284 static void
2285 mir_wput(queue_t *q, mblk_t *mp)
2286 {
2287         uint_t  frag_header;
2288         mir_t   *mir = (mir_t *)q->q_ptr;
2289         uchar_t *rptr = mp->b_rptr;
2290 
2291         if (!mir) {
2292                 freemsg(mp);
2293                 return;
2294         }
2295 
2296         if (mp->b_datap->db_type != M_DATA) {
2297                 mir_wput_other(q, mp);
2298                 return;
2299         }
2300 
2301         if (mir->mir_ordrel_pending == 1) {
2302                 freemsg(mp);
2303                 RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n",
2304                     (void *)q);
2305                 return;
2306         }
2307 
2308         frag_header = (uint_t)DLEN(mp);
2309         frag_header |= MIR_LASTFRAG;
2310 
2311         /* Stick in the 4 byte record marking header. */
2312         if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) ||
2313             !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
2314                 /*
2315                  * Since we know that M_DATA messages are created exclusively
2316                  * by kRPC, we expect that kRPC will leave room for our header
2317                  * and 4 byte align which is normal for XDR.
2318                  * If kRPC (or someone else) does not cooperate, then we
2319                  * just throw away the message.
2320                  */
2321                 RPCLOG(1, "mir_wput: kRPC did not leave space for record "
2322                     "fragment header (%d bytes left)\n",
2323                     (int)(rptr - mp->b_datap->db_base));
2324                 freemsg(mp);
2325                 return;
2326         }
2327         rptr -= sizeof (uint32_t);
2328         *(uint32_t *)rptr = htonl(frag_header);
2329         mp->b_rptr = rptr;
2330 
2331         mutex_enter(&mir->mir_mutex);
2332         if (mir->mir_type == RPC_CLIENT) {
2333                 /*
2334                  * For the client, set mir_clntreq to indicate that the
2335                  * connection is active.
2336                  */
2337                 mir->mir_clntreq = 1;
2338                 mir->mir_use_timestamp = ddi_get_lbolt();
2339         }
2340 
2341         /*
2342          * If we haven't already queued some data and the downstream module
2343          * can accept more data, send it on, otherwise we queue the message
2344          * and take other actions depending on mir_type.
2345          */
2346         if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) {
2347                 mutex_exit(&mir->mir_mutex);
2348 
2349                 /*
2350                  * Now we pass the RPC message downstream.
2351                  */
2352                 putnext(q, mp);
2353                 return;
2354         }
2355 
2356         switch (mir->mir_type) {
2357         case RPC_CLIENT:
2358                 /*
2359                  * Check for a previous duplicate request on the
2360                  * queue.  If there is one, then we throw away
2361                  * the current message and let the previous one
2362                  * go through.  If we can't find a duplicate, then
2363                  * send this one.  This tap dance is an effort
2364                  * to reduce traffic and processing requirements
2365                  * under load conditions.
2366                  */
2367                 if (mir_clnt_dup_request(q, mp)) {
2368                         mutex_exit(&mir->mir_mutex);
2369                         freemsg(mp);
2370                         return;
2371                 }
2372                 break;
2373         case RPC_SERVER:
2374                 /*
2375                  * Set mir_hold_inbound so that new inbound RPC
2376                  * messages will be held until the client catches
2377                  * up on the earlier replies.  This flag is cleared
2378                  * in mir_wsrv after flow control is relieved;
2379                  * the read-side queue is also enabled at that time.
2380                  */
2381                 mir->mir_hold_inbound = 1;
2382                 break;
2383         default:
2384                 RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type);
2385                 break;
2386         }
2387         mir->mir_inwservice = 1;
2388         (void) putq(q, mp);
2389         mutex_exit(&mir->mir_mutex);
2390 }
2391 
2392 static void
2393 mir_wput_other(queue_t *q, mblk_t *mp)
2394 {
2395         mir_t   *mir = (mir_t *)q->q_ptr;
2396         struct iocblk   *iocp;
2397         uchar_t *rptr = mp->b_rptr;
2398         bool_t  flush_in_svc = FALSE;
2399 
2400         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2401         switch (mp->b_datap->db_type) {
2402         case M_IOCTL:
2403                 iocp = (struct iocblk *)rptr;
2404                 switch (iocp->ioc_cmd) {
2405                 case RPC_CLIENT:
2406                         mutex_enter(&mir->mir_mutex);
2407                         if (mir->mir_type != 0 &&
2408                             mir->mir_type != iocp->ioc_cmd) {
2409 ioc_eperm:
2410                                 mutex_exit(&mir->mir_mutex);
2411                                 iocp->ioc_error = EPERM;
2412                                 iocp->ioc_count = 0;
2413                                 mp->b_datap->db_type = M_IOCACK;
2414                                 qreply(q, mp);
2415                                 return;
2416                         }
2417 
2418                         mir->mir_type = iocp->ioc_cmd;
2419 
2420                         /*
2421                          * Clear mir_hold_inbound which was set to 1 by
2422                          * mir_open.  This flag is not used on client
2423                          * streams.
2424                          */
2425                         mir->mir_hold_inbound = 0;
2426                         mir->mir_max_msg_sizep = &clnt_max_msg_size;
2427 
2428                         /*
2429                          * Start the idle timer.  See mir_timer() for more
2430                          * information on how client timers work.
2431                          */
2432                         mir->mir_idle_timeout = clnt_idle_timeout;
2433                         mir_clnt_idle_start(q, mir);
2434                         mutex_exit(&mir->mir_mutex);
2435 
2436                         mp->b_datap->db_type = M_IOCACK;
2437                         qreply(q, mp);
2438                         return;
2439                 case RPC_SERVER:
2440                         mutex_enter(&mir->mir_mutex);
2441                         if (mir->mir_type != 0 &&
2442                             mir->mir_type != iocp->ioc_cmd)
2443                                 goto ioc_eperm;
2444 
2445                         /*
2446                          * We don't clear mir_hold_inbound here because
2447                          * mir_hold_inbound is used in the flow control
2448                          * model. If we cleared it here, then we'd commit
2449                          * a small violation to the model where the transport
2450                          * might immediately block downstream flow.
2451                          */
2452 
2453                         mir->mir_type = iocp->ioc_cmd;
2454                         mir->mir_max_msg_sizep = &svc_max_msg_size;
2455 
2456                         /*
2457                          * Start the idle timer.  See mir_timer() for more
2458                          * information on how server timers work.
2459                          *
2460                          * Note that it is important to start the idle timer
2461                          * here so that connections time out even if we
2462                          * never receive any data on them.
2463                          */
2464                         mir->mir_idle_timeout = svc_idle_timeout;
2465                         RPCLOG(16, "mir_wput_other starting idle timer on 0x%p "
2466                             "because we got RPC_SERVER ioctl\n", (void *)q);
2467                         mir_svc_idle_start(q, mir);
2468                         mutex_exit(&mir->mir_mutex);
2469 
2470                         mp->b_datap->db_type = M_IOCACK;
2471                         qreply(q, mp);
2472                         return;
2473                 default:
2474                         break;
2475                 }
2476                 break;
2477 
2478         case M_PROTO:
2479                 if (mir->mir_type == RPC_CLIENT) {
2480                         /*
2481                          * We are likely being called from the context of a
2482                          * service procedure. So we need to enqueue. However
2483                          * enqueing may put our message behind data messages.
2484                          * So flush the data first.
2485                          */
2486                         flush_in_svc = TRUE;
2487                 }
2488                 if ((mp->b_wptr - rptr) < sizeof (uint32_t) ||
2489                     !IS_P2ALIGNED(rptr, sizeof (uint32_t)))
2490                         break;
2491 
2492                 switch (((union T_primitives *)rptr)->type) {
2493                 case T_DATA_REQ:
2494                         /* Don't pass T_DATA_REQ messages downstream. */
2495                         freemsg(mp);
2496                         return;
2497                 case T_ORDREL_REQ:
2498                         RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n",
2499                             (void *)q);
2500                         mutex_enter(&mir->mir_mutex);
2501                         if (mir->mir_type != RPC_SERVER) {
2502                                 /*
2503                                  * We are likely being called from
2504                                  * clnt_dispatch_notifyall(). Sending
2505                                  * a T_ORDREL_REQ will result in
2506                                  * a some kind of _IND message being sent,
2507                                  * will be another call to
2508                                  * clnt_dispatch_notifyall(). To keep the stack
2509                                  * lean, queue this message.
2510                                  */
2511                                 mir->mir_inwservice = 1;
2512                                 (void) putq(q, mp);
2513                                 mutex_exit(&mir->mir_mutex);
2514                                 return;
2515                         }
2516 
2517                         /*
2518                          * Mark the structure such that we don't accept any
2519                          * more requests from client. We could defer this
2520                          * until we actually send the orderly release
2521                          * request downstream, but all that does is delay
2522                          * the closing of this stream.
2523                          */
2524                         RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ "
2525                             " so calling mir_svc_start_close\n", (void *)q);
2526 
2527                         mir_svc_start_close(q, mir);
2528 
2529                         /*
2530                          * If we have sent down a T_ORDREL_REQ, don't send
2531                          * any more.
2532                          */
2533                         if (mir->mir_ordrel_pending) {
2534                                 freemsg(mp);
2535                                 mutex_exit(&mir->mir_mutex);
2536                                 return;
2537                         }
2538 
2539                         /*
2540                          * If the stream is not idle, then we hold the
2541                          * orderly release until it becomes idle.  This
2542                          * ensures that kRPC will be able to reply to
2543                          * all requests that we have passed to it.
2544                          *
2545                          * We also queue the request if there is data already
2546                          * queued, because we cannot allow the T_ORDREL_REQ
2547                          * to go before data. When we had a separate reply
2548                          * count, this was not a problem, because the
2549                          * reply count was reconciled when mir_wsrv()
2550                          * completed.
2551                          */
2552                         if (!MIR_SVC_QUIESCED(mir) ||
2553                             mir->mir_inwservice == 1) {
2554                                 mir->mir_inwservice = 1;
2555                                 (void) putq(q, mp);
2556 
2557                                 RPCLOG(16, "mir_wput_other: queuing "
2558                                     "T_ORDREL_REQ on 0x%p\n", (void *)q);
2559 
2560                                 mutex_exit(&mir->mir_mutex);
2561                                 return;
2562                         }
2563 
2564                         /*
2565                          * Mark the structure so that we know we sent
2566                          * an orderly release request, and reset the idle timer.
2567                          */
2568                         mir->mir_ordrel_pending = 1;
2569 
2570                         RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start"
2571                             " on 0x%p because we got T_ORDREL_REQ\n",
2572                             (void *)q);
2573 
2574                         mir_svc_idle_start(q, mir);
2575                         mutex_exit(&mir->mir_mutex);
2576 
2577                         /*
2578                          * When we break, we will putnext the T_ORDREL_REQ.
2579                          */
2580                         break;
2581 
2582                 case T_CONN_REQ:
2583                         mutex_enter(&mir->mir_mutex);
2584                         if (mir->mir_head_mp != NULL) {
2585                                 freemsg(mir->mir_head_mp);
2586                                 mir->mir_head_mp = NULL;
2587                                 mir->mir_tail_mp = NULL;
2588                         }
2589                         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
2590                         /*
2591                          * Restart timer in case mir_clnt_idle_do_stop() was
2592                          * called.
2593                          */
2594                         mir->mir_idle_timeout = clnt_idle_timeout;
2595                         mir_clnt_idle_stop(q, mir);
2596                         mir_clnt_idle_start(q, mir);
2597                         mutex_exit(&mir->mir_mutex);
2598                         break;
2599 
2600                 default:
2601                         /*
2602                          * T_DISCON_REQ is one of the interesting default
2603                          * cases here. Ideally, an M_FLUSH is done before
2604                          * T_DISCON_REQ is done. However, that is somewhat
2605                          * cumbersome for clnt_cots.c to do. So we queue
2606                          * T_DISCON_REQ, and let the service procedure
2607                          * flush all M_DATA.
2608                          */
2609                         break;
2610                 }
2611                 /* fallthru */;
2612         default:
2613                 if (mp->b_datap->db_type >= QPCTL) {
2614                         if (mp->b_datap->db_type == M_FLUSH) {
2615                                 if (mir->mir_type == RPC_CLIENT &&
2616                                     *mp->b_rptr & FLUSHW) {
2617                                         RPCLOG(32, "mir_wput_other: flushing "
2618                                             "wq 0x%p\n", (void *)q);
2619                                         if (*mp->b_rptr & FLUSHBAND) {
2620                                                 flushband(q, *(mp->b_rptr + 1),
2621                                                     FLUSHDATA);
2622                                         } else {
2623                                                 flushq(q, FLUSHDATA);
2624                                         }
2625                                 } else {
2626                                         RPCLOG(32, "mir_wput_other: ignoring "
2627                                             "M_FLUSH on wq 0x%p\n", (void *)q);
2628                                 }
2629                         }
2630                         break;
2631                 }
2632 
2633                 mutex_enter(&mir->mir_mutex);
2634                 if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) {
2635                         mutex_exit(&mir->mir_mutex);
2636                         break;
2637                 }
2638                 mir->mir_inwservice = 1;
2639                 mir->mir_inwflushdata = flush_in_svc;
2640                 (void) putq(q, mp);
2641                 mutex_exit(&mir->mir_mutex);
2642                 qenable(q);
2643 
2644                 return;
2645         }
2646         putnext(q, mp);
2647 }
2648 
2649 static void
2650 mir_wsrv(queue_t *q)
2651 {
2652         mblk_t  *mp;
2653         mir_t   *mir;
2654         bool_t flushdata;
2655 
2656         mir = (mir_t *)q->q_ptr;
2657         mutex_enter(&mir->mir_mutex);
2658 
2659         flushdata = mir->mir_inwflushdata;
2660         mir->mir_inwflushdata = 0;
2661 
2662         while (mp = getq(q)) {
2663                 if (mp->b_datap->db_type == M_DATA) {
2664                         /*
2665                          * Do not send any more data if we have sent
2666                          * a T_ORDREL_REQ.
2667                          */
2668                         if (flushdata || mir->mir_ordrel_pending == 1) {
2669                                 freemsg(mp);
2670                                 continue;
2671                         }
2672 
2673                         /*
2674                          * Make sure that the stream can really handle more
2675                          * data.
2676                          */
2677                         if (!MIR_WCANPUTNEXT(mir, q)) {
2678                                 (void) putbq(q, mp);
2679                                 mutex_exit(&mir->mir_mutex);
2680                                 return;
2681                         }
2682 
2683                         /*
2684                          * Now we pass the RPC message downstream.
2685                          */
2686                         mutex_exit(&mir->mir_mutex);
2687                         putnext(q, mp);
2688                         mutex_enter(&mir->mir_mutex);
2689                         continue;
2690                 }
2691 
2692                 /*
2693                  * This is not an RPC message, pass it downstream
2694                  * (ignoring flow control) if the server side is not sending a
2695                  * T_ORDREL_REQ downstream.
2696                  */
2697                 if (mir->mir_type != RPC_SERVER ||
2698                     ((union T_primitives *)mp->b_rptr)->type !=
2699                     T_ORDREL_REQ) {
2700                         mutex_exit(&mir->mir_mutex);
2701                         putnext(q, mp);
2702                         mutex_enter(&mir->mir_mutex);
2703                         continue;
2704                 }
2705 
2706                 if (mir->mir_ordrel_pending == 1) {
2707                         /*
2708                          * Don't send two T_ORDRELs
2709                          */
2710                         freemsg(mp);
2711                         continue;
2712                 }
2713 
2714                 /*
2715                  * Mark the structure so that we know we sent an orderly
2716                  * release request.  We will check to see slot is idle at the
2717                  * end of this routine, and if so, reset the idle timer to
2718                  * handle orderly release timeouts.
2719                  */
2720                 mir->mir_ordrel_pending = 1;
2721                 RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n",
2722                     (void *)q);
2723                 /*
2724                  * Send the orderly release downstream. If there are other
2725                  * pending replies we won't be able to send them.  However,
2726                  * the only reason we should send the orderly release is if
2727                  * we were idle, or if an unusual event occurred.
2728                  */
2729                 mutex_exit(&mir->mir_mutex);
2730                 putnext(q, mp);
2731                 mutex_enter(&mir->mir_mutex);
2732         }
2733 
2734         if (q->q_first == NULL)
2735                 /*
2736                  * If we call mir_svc_idle_start() below, then
2737                  * clearing mir_inwservice here will also result in
2738                  * any thread waiting in mir_close() to be signaled.
2739                  */
2740                 mir->mir_inwservice = 0;
2741 
2742         if (mir->mir_type != RPC_SERVER) {
2743                 mutex_exit(&mir->mir_mutex);
2744                 return;
2745         }
2746 
2747         /*
2748          * If idle we call mir_svc_idle_start to start the timer (or wakeup
2749          * a close). Also make sure not to start the idle timer on the
2750          * listener stream. This can cause nfsd to send an orderly release
2751          * command on the listener stream.
2752          */
2753         if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) {
2754                 RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p "
2755                     "because mir slot is idle\n", (void *)q);
2756                 mir_svc_idle_start(q, mir);
2757         }
2758 
2759         /*
2760          * If outbound flow control has been relieved, then allow new
2761          * inbound requests to be processed.
2762          */
2763         if (mir->mir_hold_inbound) {
2764                 mir->mir_hold_inbound = 0;
2765                 qenable(RD(q));
2766         }
2767         mutex_exit(&mir->mir_mutex);
2768 }
2769 
2770 static void
2771 mir_disconnect(queue_t *q, mir_t *mir)
2772 {
2773         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2774 
2775         switch (mir->mir_type) {
2776         case RPC_CLIENT:
2777                 /*
2778                  * We are disconnecting, but not necessarily
2779                  * closing. By not closing, we will fail to
2780                  * pick up a possibly changed global timeout value,
2781                  * unless we store it now.
2782                  */
2783                 mir->mir_idle_timeout = clnt_idle_timeout;
2784                 mir_clnt_idle_start(WR(q), mir);
2785                 mutex_exit(&mir->mir_mutex);
2786 
2787                 /*
2788                  * T_DISCON_REQ is passed to kRPC as an integer value
2789                  * (this is not a TPI message).  It is used as a
2790                  * convenient value to indicate a sanity check
2791                  * failure -- the same kRPC routine is also called
2792                  * for T_DISCON_INDs and T_ORDREL_INDs.
2793                  */
2794                 clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
2795                 break;
2796 
2797         case RPC_SERVER:
2798                 mir->mir_svc_no_more_msgs = 1;
2799                 mir_svc_idle_stop(WR(q), mir);
2800                 mutex_exit(&mir->mir_mutex);
2801                 RPCLOG(16, "mir_disconnect: telling "
2802                     "stream head listener to disconnect stream "
2803                     "(0x%p)\n", (void *) q);
2804                 (void) mir_svc_policy_notify(q, 2);
2805                 break;
2806 
2807         default:
2808                 mutex_exit(&mir->mir_mutex);
2809                 break;
2810         }
2811 }
2812 
2813 /*
2814  * Sanity check the message length, and if it's too large, shutdown the
2815  * connection.  Returns 1 if the connection is shutdown; 0 otherwise.
2816  */
2817 static int
2818 mir_check_len(queue_t *q, mblk_t *head_mp)
2819 {
2820         mir_t *mir = q->q_ptr;
2821         uint_t maxsize = 0;
2822         size_t msg_len = msgdsize(head_mp);
2823 
2824         if (mir->mir_max_msg_sizep != NULL)
2825                 maxsize = *mir->mir_max_msg_sizep;
2826 
2827         if (maxsize == 0 || msg_len <= maxsize)
2828                 return (0);
2829 
2830         freemsg(head_mp);
2831         mir->mir_head_mp = NULL;
2832         mir->mir_tail_mp = NULL;
2833         mir->mir_frag_header = 0;
2834         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
2835         if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
2836                 cmn_err(CE_NOTE,
2837                     "kRPC: record fragment from %s of size(%lu) exceeds "
2838                     "maximum (%u). Disconnecting",
2839                     (mir->mir_type == RPC_CLIENT) ? "server" :
2840                     (mir->mir_type == RPC_SERVER) ? "client" :
2841                     "test tool", msg_len, maxsize);
2842         }
2843 
2844         mir_disconnect(q, mir);
2845         return (1);
2846 }