1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  24  */
  25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
  26 /* All Rights Reserved */
  27 /*
  28  * Portions of this source code were derived from Berkeley
  29  * 4.3 BSD under license from the Regents of the University of
  30  * California.
  31  */
  32 
  33 /*
  34  * Server side of RPC over RDMA in the kernel.
  35  */
  36 
  37 #include <sys/param.h>
  38 #include <sys/types.h>
  39 #include <sys/user.h>
  40 #include <sys/sysmacros.h>
  41 #include <sys/proc.h>
  42 #include <sys/file.h>
  43 #include <sys/errno.h>
  44 #include <sys/kmem.h>
  45 #include <sys/debug.h>
  46 #include <sys/systm.h>
  47 #include <sys/cmn_err.h>
  48 #include <sys/kstat.h>
  49 #include <sys/vtrace.h>
  50 #include <sys/debug.h>
  51 
  52 #include <rpc/types.h>
  53 #include <rpc/xdr.h>
  54 #include <rpc/auth.h>
  55 #include <rpc/clnt.h>
  56 #include <rpc/rpc_msg.h>
  57 #include <rpc/svc.h>
  58 #include <rpc/rpc_rdma.h>
  59 #include <sys/ddi.h>
  60 #include <sys/sunddi.h>
  61 
  62 #include <inet/common.h>
  63 #include <inet/ip.h>
  64 #include <inet/ip6.h>
  65 
  66 #include <nfs/nfs.h>
  67 #include <sys/sdt.h>
  68 
  69 #define SVC_RDMA_SUCCESS 0
  70 #define SVC_RDMA_FAIL -1
  71 
  72 #define SVC_CREDIT_FACTOR (0.5)
  73 
  74 #define MSG_IS_RPCSEC_GSS(msg)          \
  75         ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS)
  76 
  77 
  78 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT;
  79 
  80 /*
  81  * RDMA transport specific data associated with SVCMASTERXPRT
  82  */
  83 struct rdma_data {
  84         SVCMASTERXPRT   *rd_xprt;       /* back ptr to SVCMASTERXPRT */
  85         struct rdma_svc_data rd_data;   /* rdma data */
  86         rdma_mod_t      *r_mod;         /* RDMA module containing ops ptr */
  87 };
  88 
  89 /*
  90  * Plugin connection specific data stashed away in clone SVCXPRT
  91  */
  92 struct clone_rdma_data {
  93         bool_t          cloned;         /* xprt cloned for thread processing */
  94         CONN            *conn;          /* RDMA connection */
  95         rdma_buf_t      rpcbuf;         /* RPC req/resp buffer */
  96         struct clist    *cl_reply;      /* reply chunk buffer info */
  97         struct clist    *cl_wlist;              /* write list clist */
  98 };
  99 
 100 
 101 #define MAXADDRLEN      128     /* max length for address mask */
 102 
 103 /*
 104  * Routines exported through ops vector.
 105  */
 106 static bool_t           svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *);
 107 static bool_t           svc_rdma_ksend(SVCXPRT *, struct rpc_msg *);
 108 static bool_t           svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t);
 109 static bool_t           svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t);
 110 void                    svc_rdma_kdestroy(SVCMASTERXPRT *);
 111 static int              svc_rdma_kdup(struct svc_req *, caddr_t, int,
 112                                 struct dupreq **, bool_t *);
 113 static void             svc_rdma_kdupdone(struct dupreq *, caddr_t,
 114                                 void (*)(), int, int);
 115 static int32_t          *svc_rdma_kgetres(SVCXPRT *, int);
 116 static void             svc_rdma_kfreeres(SVCXPRT *);
 117 static void             svc_rdma_kclone_destroy(SVCXPRT *);
 118 static void             svc_rdma_kstart(SVCMASTERXPRT *);
 119 void                    svc_rdma_kstop(SVCMASTERXPRT *);
 120 static void             svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *);
 121 static void             svc_rdma_ktattrs(SVCXPRT *, int, void **);
 122 
 123 static int      svc_process_long_reply(SVCXPRT *, xdrproc_t,
 124                         caddr_t, struct rpc_msg *, bool_t, int *,
 125                         int *, int *, unsigned int *);
 126 
 127 static int      svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t,
 128                         caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *,
 129                         bool_t, uint_t *);
 130 static bool_t rpcmsg_length(xdrproc_t,
 131                 caddr_t,
 132                 struct rpc_msg *, bool_t, int);
 133 
 134 /*
 135  * Server transport operations vector.
 136  */
 137 struct svc_ops rdma_svc_ops = {
 138         svc_rdma_krecv,         /* Get requests */
 139         svc_rdma_kgetargs,      /* Deserialize arguments */
 140         svc_rdma_ksend,         /* Send reply */
 141         svc_rdma_kfreeargs,     /* Free argument data space */
 142         svc_rdma_kdestroy,      /* Destroy transport handle */
 143         svc_rdma_kdup,          /* Check entry in dup req cache */
 144         svc_rdma_kdupdone,      /* Mark entry in dup req cache as done */
 145         svc_rdma_kgetres,       /* Get pointer to response buffer */
 146         svc_rdma_kfreeres,      /* Destroy pre-serialized response header */
 147         svc_rdma_kclone_destroy,        /* Destroy a clone xprt */
 148         svc_rdma_kstart,        /* Tell `ready-to-receive' to rpcmod */
 149         svc_rdma_kclone_xprt,   /* Transport specific clone xprt */
 150         svc_rdma_ktattrs        /* Get Transport Attributes */
 151 };
 152 
 153 /*
 154  * Server statistics
 155  * NOTE: This structure type is duplicated in the NFS fast path.
 156  */
 157 struct {
 158         kstat_named_t   rscalls;
 159         kstat_named_t   rsbadcalls;
 160         kstat_named_t   rsnullrecv;
 161         kstat_named_t   rsbadlen;
 162         kstat_named_t   rsxdrcall;
 163         kstat_named_t   rsdupchecks;
 164         kstat_named_t   rsdupreqs;
 165         kstat_named_t   rslongrpcs;
 166         kstat_named_t   rstotalreplies;
 167         kstat_named_t   rstotallongreplies;
 168         kstat_named_t   rstotalinlinereplies;
 169 } rdmarsstat = {
 170         { "calls",      KSTAT_DATA_UINT64 },
 171         { "badcalls",   KSTAT_DATA_UINT64 },
 172         { "nullrecv",   KSTAT_DATA_UINT64 },
 173         { "badlen",     KSTAT_DATA_UINT64 },
 174         { "xdrcall",    KSTAT_DATA_UINT64 },
 175         { "dupchecks",  KSTAT_DATA_UINT64 },
 176         { "dupreqs",    KSTAT_DATA_UINT64 },
 177         { "longrpcs",   KSTAT_DATA_UINT64 },
 178         { "totalreplies",       KSTAT_DATA_UINT64 },
 179         { "totallongreplies",   KSTAT_DATA_UINT64 },
 180         { "totalinlinereplies", KSTAT_DATA_UINT64 },
 181 };
 182 
 183 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat;
 184 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t);
 185 
 186 #define RSSTAT_INCR(x)  atomic_inc_64(&rdmarsstat.x.value.ui64)
 187 /*
 188  * Create a transport record.
 189  * The transport record, output buffer, and private data structure
 190  * are allocated.  The output buffer is serialized into using xdrmem.
 191  * There is one transport record per user process which implements a
 192  * set of services.
 193  */
 194 /* ARGSUSED */
 195 int
 196 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
 197     rdma_xprt_group_t *started_xprts)
 198 {
 199         int error;
 200         SVCMASTERXPRT *xprt;
 201         struct rdma_data *rd;
 202         rdma_registry_t *rmod;
 203         rdma_xprt_record_t *xprt_rec;
 204         queue_t *q;
 205         /*
 206          * modload the RDMA plugins is not already done.
 207          */
 208         if (!rdma_modloaded) {
 209                 /*CONSTANTCONDITION*/
 210                 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN);
 211 
 212                 mutex_enter(&rdma_modload_lock);
 213                 if (!rdma_modloaded) {
 214                         error = rdma_modload();
 215                 }
 216                 mutex_exit(&rdma_modload_lock);
 217 
 218                 if (error)
 219                         return (error);
 220         }
 221 
 222         /*
 223          * master_xprt_count is the count of master transport handles
 224          * that were successfully created and are ready to recieve for
 225          * RDMA based access.
 226          */
 227         error = 0;
 228         xprt_rec = NULL;
 229         rw_enter(&rdma_lock, RW_READER);
 230         if (rdma_mod_head == NULL) {
 231                 started_xprts->rtg_count = 0;
 232                 rw_exit(&rdma_lock);
 233                 if (rdma_dev_available)
 234                         return (EPROTONOSUPPORT);
 235                 else
 236                         return (ENODEV);
 237         }
 238 
 239         /*
 240          * If we have reached here, then atleast one RDMA plugin has loaded.
 241          * Create a master_xprt, make it start listenining on the device,
 242          * if an error is generated, record it, we might need to shut
 243          * the master_xprt.
 244          * SVC_START() calls svc_rdma_kstart which calls plugin binding
 245          * routines.
 246          */
 247         for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) {
 248 
 249                 /*
 250                  * One SVCMASTERXPRT per RDMA plugin.
 251                  */
 252                 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP);
 253                 xprt->xp_ops = &rdma_svc_ops;
 254                 xprt->xp_sct = sct;
 255                 xprt->xp_type = T_RDMA;
 256                 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
 257                 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
 258                 xprt->xp_req_head = (mblk_t *)0;
 259                 xprt->xp_req_tail = (mblk_t *)0;
 260                 xprt->xp_full = FALSE;
 261                 xprt->xp_enable = FALSE;
 262                 xprt->xp_reqs = 0;
 263                 xprt->xp_size = 0;
 264                 xprt->xp_threads = 0;
 265                 xprt->xp_detached_threads = 0;
 266 
 267                 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP);
 268                 xprt->xp_p2 = (caddr_t)rd;
 269                 rd->rd_xprt = xprt;
 270                 rd->r_mod = rmod->r_mod;
 271 
 272                 q = &rd->rd_data.q;
 273                 xprt->xp_wq = q;
 274                 q->q_ptr = &rd->rd_xprt;
 275                 xprt->xp_netid = NULL;
 276 
 277                 /*
 278                  * Each of the plugins will have their own Service ID
 279                  * to listener specific mapping, like port number for VI
 280                  * and service name for IB.
 281                  */
 282                 rd->rd_data.svcid = id;
 283                 error = svc_xprt_register(xprt, id);
 284                 if (error) {
 285                         DTRACE_PROBE(krpc__e__svcrdma__xprt__reg);
 286                         goto cleanup;
 287                 }
 288 
 289                 SVC_START(xprt);
 290                 if (!rd->rd_data.active) {
 291                         svc_xprt_unregister(xprt);
 292                         error = rd->rd_data.err_code;
 293                         goto cleanup;
 294                 }
 295 
 296                 /*
 297                  * This is set only when there is atleast one or more
 298                  * transports successfully created. We insert the pointer
 299                  * to the created RDMA master xprt into a separately maintained
 300                  * list. This way we can easily reference it later to cleanup,
 301                  * when NFS kRPC service pool is going away/unregistered.
 302                  */
 303                 started_xprts->rtg_count ++;
 304                 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP);
 305                 xprt_rec->rtr_xprt_ptr = xprt;
 306                 xprt_rec->rtr_next = started_xprts->rtg_listhead;
 307                 started_xprts->rtg_listhead = xprt_rec;
 308                 continue;
 309 cleanup:
 310                 SVC_DESTROY(xprt);
 311                 if (error == RDMA_FAILED)
 312                         error = EPROTONOSUPPORT;
 313         }
 314 
 315         rw_exit(&rdma_lock);
 316 
 317         /*
 318          * Don't return any error even if a single plugin was started
 319          * successfully.
 320          */
 321         if (started_xprts->rtg_count == 0)
 322                 return (error);
 323         return (0);
 324 }
 325 
 326 /*
 327  * Cleanup routine for freeing up memory allocated by
 328  * svc_rdma_kcreate()
 329  */
 330 void
 331 svc_rdma_kdestroy(SVCMASTERXPRT *xprt)
 332 {
 333         struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2;
 334 
 335 
 336         mutex_destroy(&xprt->xp_req_lock);
 337         mutex_destroy(&xprt->xp_thread_lock);
 338         kmem_free(rd, sizeof (*rd));
 339         kmem_free(xprt, sizeof (*xprt));
 340 }
 341 
 342 
 343 static void
 344 svc_rdma_kstart(SVCMASTERXPRT *xprt)
 345 {
 346         struct rdma_svc_data *svcdata;
 347         rdma_mod_t *rmod;
 348 
 349         svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 350         rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 351 
 352         /*
 353          * Create a listener for  module at this port
 354          */
 355 
 356         if (rmod->rdma_count != 0)
 357                 (*rmod->rdma_ops->rdma_svc_listen)(svcdata);
 358         else
 359                 svcdata->err_code = RDMA_FAILED;
 360 }
 361 
 362 void
 363 svc_rdma_kstop(SVCMASTERXPRT *xprt)
 364 {
 365         struct rdma_svc_data *svcdata;
 366         rdma_mod_t *rmod;
 367 
 368         svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 369         rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 370 
 371         /*
 372          * Call the stop listener routine for each plugin. If rdma_count is
 373          * already zero set active to zero.
 374          */
 375         if (rmod->rdma_count != 0)
 376                 (*rmod->rdma_ops->rdma_svc_stop)(svcdata);
 377         else
 378                 svcdata->active = 0;
 379         if (svcdata->active)
 380                 DTRACE_PROBE(krpc__e__svcrdma__kstop);
 381 }
 382 
 383 /* ARGSUSED */
 384 static void
 385 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt)
 386 {
 387 
 388         struct clone_rdma_data *cdrp;
 389         cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 390 
 391         /*
 392          * Only free buffers and release connection when cloned is set.
 393          */
 394         if (cdrp->cloned != TRUE)
 395                 return;
 396 
 397         rdma_buf_free(cdrp->conn, &cdrp->rpcbuf);
 398         if (cdrp->cl_reply) {
 399                 clist_free(cdrp->cl_reply);
 400                 cdrp->cl_reply = NULL;
 401         }
 402         RDMA_REL_CONN(cdrp->conn);
 403 
 404         cdrp->cloned = 0;
 405 }
 406 
 407 /*
 408  * Clone the xprt specific information.  It will be freed by
 409  * SVC_CLONE_DESTROY.
 410  */
 411 static void
 412 svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt)
 413 {
 414         struct clone_rdma_data *srcp2;
 415         struct clone_rdma_data *dstp2;
 416 
 417         srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf;
 418         dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf;
 419 
 420         if (srcp2->conn != NULL) {
 421                 srcp2->cloned = TRUE;
 422                 *dstp2 = *srcp2;
 423         }
 424 }
 425 
 426 static void
 427 svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr)
 428 {
 429         CONN    *conn;
 430         *tattr = NULL;
 431 
 432         switch (attrflag) {
 433         case SVC_TATTR_ADDRMASK:
 434                 conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn;
 435                 ASSERT(conn != NULL);
 436                 if (conn)
 437                         *tattr = (void *)&conn->c_addrmask;
 438         }
 439 }
 440 
 441 static bool_t
 442 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg)
 443 {
 444         XDR     *xdrs;
 445         CONN    *conn;
 446         rdma_recv_data_t        *rdp = (rdma_recv_data_t *)mp->b_rptr;
 447         struct clone_rdma_data *crdp;
 448         struct clist    *cl = NULL;
 449         struct clist    *wcl = NULL;
 450         struct clist    *cllong = NULL;
 451 
 452         rdma_stat       status;
 453         uint32_t vers, op, pos, xid;
 454         uint32_t rdma_credit;
 455         uint32_t wcl_total_length = 0;
 456         bool_t  wwl = FALSE;
 457 
 458         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 459         RSSTAT_INCR(rscalls);
 460         conn = rdp->conn;
 461 
 462         status = rdma_svc_postrecv(conn);
 463         if (status != RDMA_SUCCESS) {
 464                 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv);
 465                 goto badrpc_call;
 466         }
 467 
 468         xdrs = &clone_xprt->xp_xdrin;
 469         xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE);
 470         xid = *(uint32_t *)rdp->rpcmsg.addr;
 471         XDR_SETPOS(xdrs, sizeof (uint32_t));
 472 
 473         if (! xdr_u_int(xdrs, &vers) ||
 474             ! xdr_u_int(xdrs, &rdma_credit) ||
 475             ! xdr_u_int(xdrs, &op)) {
 476                 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint);
 477                 goto xdr_err;
 478         }
 479 
 480         /* Checking if the status of the recv operation was normal */
 481         if (rdp->status != 0) {
 482                 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status,
 483                     int, rdp->status);
 484                 goto badrpc_call;
 485         }
 486 
 487         if (! xdr_do_clist(xdrs, &cl)) {
 488                 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist);
 489                 goto xdr_err;
 490         }
 491 
 492         if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) {
 493                 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist);
 494                 if (cl)
 495                         clist_free(cl);
 496                 goto xdr_err;
 497         }
 498         crdp->cl_wlist = wcl;
 499 
 500         crdp->cl_reply = NULL;
 501         (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply);
 502 
 503         /*
 504          * A chunk at 0 offset indicates that the RPC call message
 505          * is in a chunk. Get the RPC call message chunk.
 506          */
 507         if (cl != NULL && op == RDMA_NOMSG) {
 508 
 509                 /* Remove RPC call message chunk from chunklist */
 510                 cllong = cl;
 511                 cl = cl->c_next;
 512                 cllong->c_next = NULL;
 513 
 514 
 515                 /* Allocate and register memory for the RPC call msg chunk */
 516                 cllong->rb_longbuf.type = RDMA_LONG_BUFFER;
 517                 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ?
 518                     cllong->c_len : LONG_REPLY_LEN;
 519 
 520                 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) {
 521                         clist_free(cllong);
 522                         goto cll_malloc_err;
 523                 }
 524 
 525                 cllong->u.c_daddr3 = cllong->rb_longbuf.addr;
 526 
 527                 if (cllong->u.c_daddr == NULL) {
 528                         DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem);
 529                         rdma_buf_free(conn, &cllong->rb_longbuf);
 530                         clist_free(cllong);
 531                         goto cll_malloc_err;
 532                 }
 533 
 534                 status = clist_register(conn, cllong, CLIST_REG_DST);
 535                 if (status) {
 536                         DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg);
 537                         rdma_buf_free(conn, &cllong->rb_longbuf);
 538                         clist_free(cllong);
 539                         goto cll_malloc_err;
 540                 }
 541 
 542                 /*
 543                  * Now read the RPC call message in
 544                  */
 545                 status = RDMA_READ(conn, cllong, WAIT);
 546                 if (status) {
 547                         DTRACE_PROBE(krpc__e__svcrdma__krecv__read);
 548                         (void) clist_deregister(conn, cllong);
 549                         rdma_buf_free(conn, &cllong->rb_longbuf);
 550                         clist_free(cllong);
 551                         goto cll_malloc_err;
 552                 }
 553 
 554                 status = clist_syncmem(conn, cllong, CLIST_REG_DST);
 555                 (void) clist_deregister(conn, cllong);
 556 
 557                 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3,
 558                     cllong->c_len, 0, cl, XDR_DECODE, conn);
 559 
 560                 crdp->rpcbuf = cllong->rb_longbuf;
 561                 crdp->rpcbuf.len = cllong->c_len;
 562                 clist_free(cllong);
 563                 RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 564         } else {
 565                 pos = XDR_GETPOS(xdrs);
 566                 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos,
 567                     rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn);
 568                 crdp->rpcbuf = rdp->rpcmsg;
 569 
 570                 /* Use xdrrdmablk_ops to indicate there is a read chunk list */
 571                 if (cl != NULL) {
 572                         int32_t flg = XDR_RDMA_RLIST_REG;
 573 
 574                         XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 575                         xdrs->x_ops = &xdrrdmablk_ops;
 576                 }
 577         }
 578 
 579         if (crdp->cl_wlist) {
 580                 int32_t flg = XDR_RDMA_WLIST_REG;
 581 
 582                 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist);
 583                 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 584         }
 585 
 586         if (! xdr_callmsg(xdrs, msg)) {
 587                 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg);
 588                 RSSTAT_INCR(rsxdrcall);
 589                 goto callmsg_err;
 590         }
 591 
 592         /*
 593          * Point the remote transport address in the service_transport
 594          * handle at the address in the request.
 595          */
 596         clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf;
 597         clone_xprt->xp_rtaddr.len = conn->c_raddr.len;
 598         clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len;
 599 
 600         clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf;
 601         clone_xprt->xp_lcladdr.len = conn->c_laddr.len;
 602         clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len;
 603 
 604         /*
 605          * In case of RDMA, connection management is
 606          * entirely done in rpcib module and netid in the
 607          * SVCMASTERXPRT is NULL. Initialize the clone netid
 608          * from the connection.
 609          */
 610 
 611         clone_xprt->xp_netid = conn->c_netid;
 612 
 613         clone_xprt->xp_xid = xid;
 614         crdp->conn = conn;
 615 
 616         freeb(mp);
 617 
 618         return (TRUE);
 619 
 620 callmsg_err:
 621         rdma_buf_free(conn, &crdp->rpcbuf);
 622 
 623 cll_malloc_err:
 624         if (cl)
 625                 clist_free(cl);
 626 xdr_err:
 627         XDR_DESTROY(xdrs);
 628 
 629 badrpc_call:
 630         RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 631         RDMA_REL_CONN(conn);
 632         freeb(mp);
 633         RSSTAT_INCR(rsbadcalls);
 634         return (FALSE);
 635 }
 636 
 637 static int
 638 svc_process_long_reply(SVCXPRT * clone_xprt,
 639     xdrproc_t xdr_results, caddr_t xdr_location,
 640     struct rpc_msg *msg, bool_t has_args, int *msglen,
 641     int *freelen, int *numchunks, unsigned int *final_len)
 642 {
 643         int status;
 644         XDR xdrslong;
 645         struct clist *wcl = NULL;
 646         int count = 0;
 647         int alloc_len;
 648         char  *memp;
 649         rdma_buf_t long_rpc = {0};
 650         struct clone_rdma_data *crdp;
 651 
 652         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 653 
 654         bzero(&xdrslong, sizeof (xdrslong));
 655 
 656         /* Choose a size for the long rpc response */
 657         if (MSG_IS_RPCSEC_GSS(msg)) {
 658                 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen);
 659         } else {
 660                 alloc_len = RNDUP(*msglen);
 661         }
 662 
 663         if (alloc_len <= 64 * 1024) {
 664                 if (alloc_len > 32 * 1024) {
 665                         alloc_len = 64 * 1024;
 666                 } else {
 667                         if (alloc_len > 16 * 1024) {
 668                                 alloc_len = 32 * 1024;
 669                         } else {
 670                                 alloc_len = 16 * 1024;
 671                         }
 672                 }
 673         }
 674 
 675         long_rpc.type = RDMA_LONG_BUFFER;
 676         long_rpc.len = alloc_len;
 677         if (rdma_buf_alloc(crdp->conn, &long_rpc)) {
 678                 return (SVC_RDMA_FAIL);
 679         }
 680 
 681         memp = long_rpc.addr;
 682         xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE);
 683 
 684         msg->rm_xid = clone_xprt->xp_xid;
 685 
 686         if (!(xdr_replymsg(&xdrslong, msg) &&
 687             (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong,
 688             xdr_results, xdr_location)))) {
 689                 rdma_buf_free(crdp->conn, &long_rpc);
 690                 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap);
 691                 return (SVC_RDMA_FAIL);
 692         }
 693 
 694         *final_len = XDR_GETPOS(&xdrslong);
 695 
 696         DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len);
 697         *numchunks = 0;
 698         *freelen = 0;
 699 
 700         wcl = crdp->cl_reply;
 701         wcl->rb_longbuf = long_rpc;
 702 
 703         count = *final_len;
 704         while ((wcl != NULL) && (count > 0)) {
 705 
 706                 if (wcl->c_dmemhandle.mrc_rmr == 0)
 707                         break;
 708 
 709                 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count,
 710                     uint32_t, wcl->c_len);
 711 
 712                 if (wcl->c_len > count) {
 713                         wcl->c_len = count;
 714                 }
 715                 wcl->w.c_saddr3 = (caddr_t)memp;
 716 
 717                 count -= wcl->c_len;
 718                 *numchunks +=  1;
 719                 memp += wcl->c_len;
 720                 wcl = wcl->c_next;
 721         }
 722 
 723         /*
 724          * Make rest of the chunks 0-len
 725          */
 726         while (wcl != NULL) {
 727                 if (wcl->c_dmemhandle.mrc_rmr == 0)
 728                         break;
 729                 wcl->c_len = 0;
 730                 wcl = wcl->c_next;
 731         }
 732 
 733         wcl = crdp->cl_reply;
 734 
 735         /*
 736          * MUST fail if there are still more data
 737          */
 738         if (count > 0) {
 739                 rdma_buf_free(crdp->conn, &long_rpc);
 740                 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist);
 741                 return (SVC_RDMA_FAIL);
 742         }
 743 
 744         if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) {
 745                 rdma_buf_free(crdp->conn, &long_rpc);
 746                 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg);
 747                 return (SVC_RDMA_FAIL);
 748         }
 749 
 750         status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE);
 751 
 752         if (status) {
 753                 (void) clist_deregister(crdp->conn, wcl);
 754                 rdma_buf_free(crdp->conn, &long_rpc);
 755                 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem);
 756                 return (SVC_RDMA_FAIL);
 757         }
 758 
 759         status = RDMA_WRITE(crdp->conn, wcl, WAIT);
 760 
 761         (void) clist_deregister(crdp->conn, wcl);
 762         rdma_buf_free(crdp->conn, &wcl->rb_longbuf);
 763 
 764         if (status != RDMA_SUCCESS) {
 765                 DTRACE_PROBE(krpc__e__svcrdma__longrep__write);
 766                 return (SVC_RDMA_FAIL);
 767         }
 768 
 769         return (SVC_RDMA_SUCCESS);
 770 }
 771 
 772 
 773 static int
 774 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results,
 775     caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs,
 776     struct rpc_msg *msg, bool_t has_args, uint_t *len)
 777 {
 778         /*
 779          * Get a pre-allocated buffer for rpc reply
 780          */
 781         rpcreply->type = SEND_BUFFER;
 782         if (rdma_buf_alloc(conn, rpcreply)) {
 783                 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs);
 784                 return (SVC_RDMA_FAIL);
 785         }
 786 
 787         xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len,
 788             0, NULL, XDR_ENCODE, conn);
 789 
 790         msg->rm_xid = clone_xprt->xp_xid;
 791 
 792         if (has_args) {
 793                 if (!(xdr_replymsg(*xdrs, msg) &&
 794                     (!has_args ||
 795                     SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs,
 796                     xdr_results, xdr_location)))) {
 797                         rdma_buf_free(conn, rpcreply);
 798                         DTRACE_PROBE(
 799                             krpc__e__svcrdma__rpcmsg__reply__authwrap1);
 800                         return (SVC_RDMA_FAIL);
 801                 }
 802         } else {
 803                 if (!xdr_replymsg(*xdrs, msg)) {
 804                         rdma_buf_free(conn, rpcreply);
 805                         DTRACE_PROBE(
 806                             krpc__e__svcrdma__rpcmsg__reply__authwrap2);
 807                         return (SVC_RDMA_FAIL);
 808                 }
 809         }
 810 
 811         *len = XDR_GETPOS(*xdrs);
 812 
 813         return (SVC_RDMA_SUCCESS);
 814 }
 815 
 816 /*
 817  * Send rpc reply.
 818  */
 819 static bool_t
 820 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg)
 821 {
 822         XDR *xdrs_rpc = &(clone_xprt->xp_xdrout);
 823         XDR xdrs_rhdr;
 824         CONN *conn = NULL;
 825         rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0};
 826 
 827         struct clone_rdma_data *crdp;
 828         struct clist *cl_read = NULL;
 829         struct clist *cl_send = NULL;
 830         struct clist *cl_write = NULL;
 831         xdrproc_t xdr_results;          /* results XDR encoding function */
 832         caddr_t xdr_location;           /* response results pointer */
 833 
 834         int retval = FALSE;
 835         int status, msglen, num_wreply_segments = 0;
 836         uint32_t rdma_credit = 0;
 837         int freelen = 0;
 838         bool_t has_args;
 839         uint_t  final_resp_len, rdma_response_op, vers;
 840 
 841         bzero(&xdrs_rhdr, sizeof (XDR));
 842         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 843         conn = crdp->conn;
 844 
 845         /*
 846          * If there is a result procedure specified in the reply message,
 847          * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
 848          * We need to make sure it won't be processed twice, so we null
 849          * it for xdr_replymsg here.
 850          */
 851         has_args = FALSE;
 852         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
 853             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
 854                 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) {
 855                         has_args = TRUE;
 856                         xdr_location = msg->acpted_rply.ar_results.where;
 857                         msg->acpted_rply.ar_results.proc = xdr_void;
 858                         msg->acpted_rply.ar_results.where = NULL;
 859                 }
 860         }
 861 
 862         /*
 863          * Given the limit on the inline response size (RPC_MSG_SZ),
 864          * there is a need to make a guess as to the overall size of
 865          * the response.  If the resultant size is beyond the inline
 866          * size, then the server needs to use the "reply chunk list"
 867          * provided by the client (if the client provided one).  An
 868          * example of this type of response would be a READDIR
 869          * response (e.g. a small directory read would fit in RPC_MSG_SZ
 870          * and that is the preference but it may not fit)
 871          *
 872          * Combine the encoded size and the size of the true results
 873          * and then make the decision about where to encode and send results.
 874          *
 875          * One important note, this calculation is ignoring the size
 876          * of the encoding of the authentication overhead.  The reason
 877          * for this is rooted in the complexities of access to the
 878          * encoded size of RPCSEC_GSS related authentiation,
 879          * integrity, and privacy.
 880          *
 881          * If it turns out that the encoded authentication bumps the
 882          * response over the RPC_MSG_SZ limit, then it may need to
 883          * attempt to encode for the reply chunk list.
 884          */
 885 
 886         /*
 887          * Calculating the "sizeof" the RPC response header and the
 888          * encoded results.
 889          */
 890         msglen = xdr_sizeof(xdr_replymsg, msg);
 891 
 892         if (msglen > 0) {
 893                 RSSTAT_INCR(rstotalreplies);
 894         }
 895         if (has_args)
 896                 msglen += xdrrdma_sizeof(xdr_results, xdr_location,
 897                     rdma_minchunk, NULL, NULL);
 898 
 899         DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen);
 900 
 901         status = SVC_RDMA_SUCCESS;
 902 
 903         if (msglen < RPC_MSG_SZ) {
 904                 /*
 905                  * Looks like the response will fit in the inline
 906                  * response; let's try
 907                  */
 908                 RSSTAT_INCR(rstotalinlinereplies);
 909 
 910                 rdma_response_op = RDMA_MSG;
 911 
 912                 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results,
 913                     xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg,
 914                     has_args, &final_resp_len);
 915 
 916                 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status,
 917                     int, status);
 918                 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len,
 919                     int, final_resp_len);
 920 
 921                 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) {
 922                         clist_free(crdp->cl_reply);
 923                         crdp->cl_reply = NULL;
 924                 }
 925         }
 926 
 927         /*
 928          * If the encode failed (size?) or the message really is
 929          * larger than what is allowed, try the response chunk list.
 930          */
 931         if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) {
 932                 /*
 933                  * attempting to use a reply chunk list when there
 934                  * isn't one won't get very far...
 935                  */
 936                 if (crdp->cl_reply == NULL) {
 937                         DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl);
 938                         goto out;
 939                 }
 940 
 941                 RSSTAT_INCR(rstotallongreplies);
 942 
 943                 msglen = xdr_sizeof(xdr_replymsg, msg);
 944                 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0,
 945                     NULL, NULL);
 946 
 947                 status = svc_process_long_reply(clone_xprt, xdr_results,
 948                     xdr_location, msg, has_args, &msglen, &freelen,
 949                     &num_wreply_segments, &final_resp_len);
 950 
 951                 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen,
 952                     int, final_resp_len);
 953 
 954                 if (status != SVC_RDMA_SUCCESS) {
 955                         DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed);
 956                         goto out;
 957                 }
 958 
 959                 rdma_response_op = RDMA_NOMSG;
 960         }
 961 
 962         DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len,
 963             int, final_resp_len);
 964 
 965         rbuf_resp.type = SEND_BUFFER;
 966         if (rdma_buf_alloc(conn, &rbuf_resp)) {
 967                 rdma_buf_free(conn, &rbuf_rpc_resp);
 968                 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs);
 969                 goto out;
 970         }
 971 
 972         rdma_credit = rdma_bufs_granted;
 973 
 974         vers = RPCRDMA_VERS;
 975         xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE);
 976         (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid;
 977         /* Skip xid and set the xdr position accordingly. */
 978         XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t));
 979         if (!xdr_u_int(&xdrs_rhdr, &vers) ||
 980             !xdr_u_int(&xdrs_rhdr, &rdma_credit) ||
 981             !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) {
 982                 rdma_buf_free(conn, &rbuf_rpc_resp);
 983                 rdma_buf_free(conn, &rbuf_resp);
 984                 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint);
 985                 goto out;
 986         }
 987 
 988         /*
 989          * Now XDR the read chunk list, actually always NULL
 990          */
 991         (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read);
 992 
 993         /*
 994          * encode write list -- we already drove RDMA_WRITEs
 995          */
 996         cl_write = crdp->cl_wlist;
 997         if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) {
 998                 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist);
 999                 rdma_buf_free(conn, &rbuf_rpc_resp);
1000                 rdma_buf_free(conn, &rbuf_resp);
1001                 goto out;
1002         }
1003 
1004         /*
1005          * XDR encode the RDMA_REPLY write chunk
1006          */
1007         if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply,
1008             num_wreply_segments)) {
1009                 rdma_buf_free(conn, &rbuf_rpc_resp);
1010                 rdma_buf_free(conn, &rbuf_resp);
1011                 goto out;
1012         }
1013 
1014         clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle,
1015             rbuf_resp.addr, NULL, NULL);
1016 
1017         if (rdma_response_op == RDMA_MSG) {
1018                 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle,
1019                     rbuf_rpc_resp.addr, NULL, NULL);
1020         }
1021 
1022         status = RDMA_SEND(conn, cl_send, msg->rm_xid);
1023 
1024         if (status == RDMA_SUCCESS) {
1025                 retval = TRUE;
1026         }
1027 
1028 out:
1029         /*
1030          * Free up sendlist chunks
1031          */
1032         if (cl_send != NULL)
1033                 clist_free(cl_send);
1034 
1035         /*
1036          * Destroy private data for xdr rdma
1037          */
1038         if (clone_xprt->xp_xdrout.x_ops != NULL) {
1039                 XDR_DESTROY(&(clone_xprt->xp_xdrout));
1040         }
1041 
1042         if (crdp->cl_reply) {
1043                 clist_free(crdp->cl_reply);
1044                 crdp->cl_reply = NULL;
1045         }
1046 
1047         /*
1048          * This is completely disgusting.  If public is set it is
1049          * a pointer to a structure whose first field is the address
1050          * of the function to free that structure and any related
1051          * stuff.  (see rrokfree in nfs_xdr.c).
1052          */
1053         if (xdrs_rpc->x_public) {
1054                 /* LINTED pointer alignment */
1055                 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public);
1056         }
1057 
1058         if (xdrs_rhdr.x_ops != NULL) {
1059                 XDR_DESTROY(&xdrs_rhdr);
1060         }
1061 
1062         return (retval);
1063 }
1064 
1065 /*
1066  * Deserialize arguments.
1067  */
1068 static bool_t
1069 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr)
1070 {
1071         if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin,
1072             xdr_args, args_ptr)) != TRUE)
1073                 return (FALSE);
1074         return (TRUE);
1075 }
1076 
1077 static bool_t
1078 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args,
1079     caddr_t args_ptr)
1080 {
1081         struct clone_rdma_data *crdp;
1082         bool_t retval;
1083 
1084         /*
1085          * If the cloned bit is true, then this transport specific
1086          * rmda data has been duplicated into another cloned xprt. Do
1087          * not free, or release the connection, it is still in use.  The
1088          * buffers will be freed and the connection released later by
1089          * SVC_CLONE_DESTROY().
1090          */
1091         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
1092         if (crdp->cloned == TRUE) {
1093                 crdp->cloned = 0;
1094                 return (TRUE);
1095         }
1096 
1097         /*
1098          * Free the args if needed then XDR_DESTROY
1099          */
1100         if (args_ptr) {
1101                 XDR     *xdrs = &clone_xprt->xp_xdrin;
1102 
1103                 xdrs->x_op = XDR_FREE;
1104                 retval = (*xdr_args)(xdrs, args_ptr);
1105         }
1106 
1107         XDR_DESTROY(&(clone_xprt->xp_xdrin));
1108         rdma_buf_free(crdp->conn, &crdp->rpcbuf);
1109         if (crdp->cl_reply) {
1110                 clist_free(crdp->cl_reply);
1111                 crdp->cl_reply = NULL;
1112         }
1113         RDMA_REL_CONN(crdp->conn);
1114 
1115         return (retval);
1116 }
1117 
1118 /* ARGSUSED */
1119 static int32_t *
1120 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size)
1121 {
1122         return (NULL);
1123 }
1124 
1125 /* ARGSUSED */
1126 static void
1127 svc_rdma_kfreeres(SVCXPRT *clone_xprt)
1128 {
1129 }
1130 
1131 /*
1132  * the dup cacheing routines below provide a cache of non-failure
1133  * transaction id's.  rpc service routines can use this to detect
1134  * retransmissions and re-send a non-failure response.
1135  */
1136 
1137 /*
1138  * MAXDUPREQS is the number of cached items.  It should be adjusted
1139  * to the service load so that there is likely to be a response entry
1140  * when the first retransmission comes in.
1141  */
1142 #define MAXDUPREQS      1024
1143 
1144 /*
1145  * This should be appropriately scaled to MAXDUPREQS.
1146  */
1147 #define DRHASHSZ        257
1148 
1149 #if ((DRHASHSZ & (DRHASHSZ - 1)) == 0)
1150 #define XIDHASH(xid)    ((xid) & (DRHASHSZ - 1))
1151 #else
1152 #define XIDHASH(xid)    ((xid) % DRHASHSZ)
1153 #endif
1154 #define DRHASH(dr)      XIDHASH((dr)->dr_xid)
1155 #define REQTOXID(req)   ((req)->rq_xprt->xp_xid)
1156 
1157 static int      rdmandupreqs = 0;
1158 int     rdmamaxdupreqs = MAXDUPREQS;
1159 static kmutex_t rdmadupreq_lock;
1160 static struct dupreq *rdmadrhashtbl[DRHASHSZ];
1161 static int      rdmadrhashstat[DRHASHSZ];
1162 
1163 static void unhash(struct dupreq *);
1164 
1165 /*
1166  * rdmadrmru points to the head of a circular linked list in lru order.
1167  * rdmadrmru->dr_next == drlru
1168  */
1169 struct dupreq *rdmadrmru;
1170 
1171 /*
1172  * svc_rdma_kdup searches the request cache and returns 0 if the
1173  * request is not found in the cache.  If it is found, then it
1174  * returns the state of the request (in progress or done) and
1175  * the status or attributes that were part of the original reply.
1176  */
1177 static int
1178 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
1179         bool_t *dupcachedp)
1180 {
1181         struct dupreq *dr;
1182         uint32_t xid;
1183         uint32_t drhash;
1184         int status;
1185 
1186         xid = REQTOXID(req);
1187         mutex_enter(&rdmadupreq_lock);
1188         RSSTAT_INCR(rsdupchecks);
1189         /*
1190          * Check to see whether an entry already exists in the cache.
1191          */
1192         dr = rdmadrhashtbl[XIDHASH(xid)];
1193         while (dr != NULL) {
1194                 if (dr->dr_xid == xid &&
1195                     dr->dr_proc == req->rq_proc &&
1196                     dr->dr_prog == req->rq_prog &&
1197                     dr->dr_vers == req->rq_vers &&
1198                     dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len &&
1199                     bcmp((caddr_t)dr->dr_addr.buf,
1200                     (caddr_t)req->rq_xprt->xp_rtaddr.buf,
1201                     dr->dr_addr.len) == 0) {
1202                         status = dr->dr_status;
1203                         if (status == DUP_DONE) {
1204                                 bcopy(dr->dr_resp.buf, res, size);
1205                                 if (dupcachedp != NULL)
1206                                         *dupcachedp = (dr->dr_resfree != NULL);
1207                         } else {
1208                                 dr->dr_status = DUP_INPROGRESS;
1209                                 *drpp = dr;
1210                         }
1211                         RSSTAT_INCR(rsdupreqs);
1212                         mutex_exit(&rdmadupreq_lock);
1213                         return (status);
1214                 }
1215                 dr = dr->dr_chain;
1216         }
1217 
1218         /*
1219          * There wasn't an entry, either allocate a new one or recycle
1220          * an old one.
1221          */
1222         if (rdmandupreqs < rdmamaxdupreqs) {
1223                 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP);
1224                 if (dr == NULL) {
1225                         mutex_exit(&rdmadupreq_lock);
1226                         return (DUP_ERROR);
1227                 }
1228                 dr->dr_resp.buf = NULL;
1229                 dr->dr_resp.maxlen = 0;
1230                 dr->dr_addr.buf = NULL;
1231                 dr->dr_addr.maxlen = 0;
1232                 if (rdmadrmru) {
1233                         dr->dr_next = rdmadrmru->dr_next;
1234                         rdmadrmru->dr_next = dr;
1235                 } else {
1236                         dr->dr_next = dr;
1237                 }
1238                 rdmandupreqs++;
1239         } else {
1240                 dr = rdmadrmru->dr_next;
1241                 while (dr->dr_status == DUP_INPROGRESS) {
1242                         dr = dr->dr_next;
1243                         if (dr == rdmadrmru->dr_next) {
1244                                 mutex_exit(&rdmadupreq_lock);
1245                                 return (DUP_ERROR);
1246                         }
1247                 }
1248                 unhash(dr);
1249                 if (dr->dr_resfree) {
1250                         (*dr->dr_resfree)(dr->dr_resp.buf);
1251                 }
1252         }
1253         dr->dr_resfree = NULL;
1254         rdmadrmru = dr;
1255 
1256         dr->dr_xid = REQTOXID(req);
1257         dr->dr_prog = req->rq_prog;
1258         dr->dr_vers = req->rq_vers;
1259         dr->dr_proc = req->rq_proc;
1260         if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) {
1261                 if (dr->dr_addr.buf != NULL)
1262                         kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
1263                 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len;
1264                 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP);
1265                 if (dr->dr_addr.buf == NULL) {
1266                         dr->dr_addr.maxlen = 0;
1267                         dr->dr_status = DUP_DROP;
1268                         mutex_exit(&rdmadupreq_lock);
1269                         return (DUP_ERROR);
1270                 }
1271         }
1272         dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len;
1273         bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len);
1274         if (dr->dr_resp.maxlen < size) {
1275                 if (dr->dr_resp.buf != NULL)
1276                         kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
1277                 dr->dr_resp.maxlen = (unsigned int)size;
1278                 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP);
1279                 if (dr->dr_resp.buf == NULL) {
1280                         dr->dr_resp.maxlen = 0;
1281                         dr->dr_status = DUP_DROP;
1282                         mutex_exit(&rdmadupreq_lock);
1283                         return (DUP_ERROR);
1284                 }
1285         }
1286         dr->dr_status = DUP_INPROGRESS;
1287 
1288         drhash = (uint32_t)DRHASH(dr);
1289         dr->dr_chain = rdmadrhashtbl[drhash];
1290         rdmadrhashtbl[drhash] = dr;
1291         rdmadrhashstat[drhash]++;
1292         mutex_exit(&rdmadupreq_lock);
1293         *drpp = dr;
1294         return (DUP_NEW);
1295 }
1296 
1297 /*
1298  * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP)
1299  * and stores the response.
1300  */
1301 static void
1302 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(),
1303         int size, int status)
1304 {
1305         ASSERT(dr->dr_resfree == NULL);
1306         if (status == DUP_DONE) {
1307                 bcopy(res, dr->dr_resp.buf, size);
1308                 dr->dr_resfree = dis_resfree;
1309         }
1310         dr->dr_status = status;
1311 }
1312 
1313 /*
1314  * This routine expects that the mutex, rdmadupreq_lock, is already held.
1315  */
1316 static void
1317 unhash(struct dupreq *dr)
1318 {
1319         struct dupreq *drt;
1320         struct dupreq *drtprev = NULL;
1321         uint32_t drhash;
1322 
1323         ASSERT(MUTEX_HELD(&rdmadupreq_lock));
1324 
1325         drhash = (uint32_t)DRHASH(dr);
1326         drt = rdmadrhashtbl[drhash];
1327         while (drt != NULL) {
1328                 if (drt == dr) {
1329                         rdmadrhashstat[drhash]--;
1330                         if (drtprev == NULL) {
1331                                 rdmadrhashtbl[drhash] = drt->dr_chain;
1332                         } else {
1333                                 drtprev->dr_chain = drt->dr_chain;
1334                         }
1335                         return;
1336                 }
1337                 drtprev = drt;
1338                 drt = drt->dr_chain;
1339         }
1340 }
1341 
1342 bool_t
1343 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist)
1344 {
1345         struct clist    *clist;
1346         uint32_t        tlen;
1347 
1348         if (req->rq_xprt->xp_type != T_RDMA) {
1349                 return (FALSE);
1350         }
1351 
1352         tlen = 0;
1353         clist = wlist;
1354         while (clist) {
1355                 tlen += clist->c_len;
1356                 clist = clist->c_next;
1357         }
1358 
1359         /*
1360          * set iov to addr+len of first segment of first wchunk of
1361          * wlist sent by client.  krecv() already malloc'd a buffer
1362          * large enough, but registration is deferred until we write
1363          * the buffer back to (NFS) client using RDMA_WRITE.
1364          */
1365         iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr;
1366         iov->iov_len = tlen;
1367 
1368         return (TRUE);
1369 }
1370 
1371 /*
1372  * routine to setup the read chunk lists
1373  */
1374 
1375 int
1376 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len)
1377 {
1378         int             data_len, avail_len;
1379         uint_t          round_len;
1380 
1381         data_len = avail_len = 0;
1382 
1383         while (wcl != NULL && count > 0) {
1384                 if (wcl->c_dmemhandle.mrc_rmr == 0)
1385                         break;
1386 
1387                 if (wcl->c_len < count) {
1388                         data_len += wcl->c_len;
1389                         avail_len = 0;
1390                 } else {
1391                         data_len += count;
1392                         avail_len = wcl->c_len - count;
1393                         wcl->c_len = count;
1394                 }
1395                 count -= wcl->c_len;
1396 
1397                 if (count == 0)
1398                         break;
1399 
1400                 wcl = wcl->c_next;
1401         }
1402 
1403         /*
1404          * MUST fail if there are still more data
1405          */
1406         if (count > 0) {
1407                 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len,
1408                     int, data_len, int, count);
1409                 return (FALSE);
1410         }
1411 
1412         /*
1413          * Round up the last chunk to 4-byte boundary
1414          */
1415         *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT);
1416         round_len = *wcl_len - data_len;
1417 
1418         if (round_len) {
1419 
1420                 /*
1421                  * If there is space in the current chunk,
1422                  * add the roundup to the chunk.
1423                  */
1424                 if (avail_len >= round_len) {
1425                         wcl->c_len += round_len;
1426                 } else  {
1427                         /*
1428                          * try the next one.
1429                          */
1430                         wcl = wcl->c_next;
1431                         if ((wcl == NULL) || (wcl->c_len < round_len)) {
1432                                 DTRACE_PROBE1(
1433                                     krpc__e__rdma_setup_read_chunks_rndup,
1434                                     int, round_len);
1435                                 return (FALSE);
1436                         }
1437                         wcl->c_len = round_len;
1438                 }
1439         }
1440 
1441         wcl = wcl->c_next;
1442 
1443         /*
1444          * Make rest of the chunks 0-len
1445          */
1446 
1447         clist_zero_len(wcl);
1448 
1449         return (TRUE);
1450 }