1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright (c) 2012 by Delphix. All rights reserved.
  24  * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  25  * Copyright 2012 Marcel Telka <marcel@telka.sk>
  26  * Copyright 2018 OmniOS Community Edition (OmniOSce) Association.
  27  */
  28 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
  29 /* All Rights Reserved */
  30 /*
  31  * Portions of this source code were derived from Berkeley
  32  * 4.3 BSD under license from the Regents of the University of
  33  * California.
  34  */
  35 
  36 /*
  37  * Server side of RPC over RDMA in the kernel.
  38  */
  39 
  40 #include <sys/param.h>
  41 #include <sys/types.h>
  42 #include <sys/user.h>
  43 #include <sys/sysmacros.h>
  44 #include <sys/proc.h>
  45 #include <sys/file.h>
  46 #include <sys/errno.h>
  47 #include <sys/kmem.h>
  48 #include <sys/debug.h>
  49 #include <sys/systm.h>
  50 #include <sys/cmn_err.h>
  51 #include <sys/kstat.h>
  52 #include <sys/vtrace.h>
  53 #include <sys/debug.h>
  54 
  55 #include <rpc/types.h>
  56 #include <rpc/xdr.h>
  57 #include <rpc/auth.h>
  58 #include <rpc/clnt.h>
  59 #include <rpc/rpc_msg.h>
  60 #include <rpc/svc.h>
  61 #include <rpc/rpc_rdma.h>
  62 #include <sys/ddi.h>
  63 #include <sys/sunddi.h>
  64 
  65 #include <inet/common.h>
  66 #include <inet/ip.h>
  67 #include <inet/ip6.h>
  68 
  69 #include <nfs/nfs.h>
  70 #include <sys/sdt.h>
  71 
  72 #define SVC_RDMA_SUCCESS 0
  73 #define SVC_RDMA_FAIL -1
  74 
  75 #define SVC_CREDIT_FACTOR (0.5)
  76 
  77 #define MSG_IS_RPCSEC_GSS(msg)          \
  78         ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS)
  79 
  80 
  81 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT;
  82 
  83 /*
  84  * RDMA transport specific data associated with SVCMASTERXPRT
  85  */
  86 struct rdma_data {
  87         SVCMASTERXPRT   *rd_xprt;       /* back ptr to SVCMASTERXPRT */
  88         struct rdma_svc_data rd_data;   /* rdma data */
  89         rdma_mod_t      *r_mod;         /* RDMA module containing ops ptr */
  90 };
  91 
  92 /*
  93  * Plugin connection specific data stashed away in clone SVCXPRT
  94  */
  95 struct clone_rdma_data {
  96         bool_t          cloned;         /* xprt cloned for thread processing */
  97         CONN            *conn;          /* RDMA connection */
  98         rdma_buf_t      rpcbuf;         /* RPC req/resp buffer */
  99         struct clist    *cl_reply;      /* reply chunk buffer info */
 100         struct clist    *cl_wlist;              /* write list clist */
 101 };
 102 
 103 
 104 #define MAXADDRLEN      128     /* max length for address mask */
 105 
 106 /*
 107  * Routines exported through ops vector.
 108  */
 109 static bool_t           svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *);
 110 static bool_t           svc_rdma_ksend(SVCXPRT *, struct rpc_msg *);
 111 static bool_t           svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t);
 112 static bool_t           svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t);
 113 void                    svc_rdma_kdestroy(SVCMASTERXPRT *);
 114 static int              svc_rdma_kdup(struct svc_req *, caddr_t, int,
 115                                 struct dupreq **, bool_t *);
 116 static void             svc_rdma_kdupdone(struct dupreq *, caddr_t,
 117                                 void (*)(), int, int);
 118 static int32_t          *svc_rdma_kgetres(SVCXPRT *, int);
 119 static void             svc_rdma_kfreeres(SVCXPRT *);
 120 static void             svc_rdma_kclone_destroy(SVCXPRT *);
 121 static void             svc_rdma_kstart(SVCMASTERXPRT *);
 122 void                    svc_rdma_kstop(SVCMASTERXPRT *);
 123 static void             svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *);
 124 static void             svc_rdma_ktattrs(SVCXPRT *, int, void **);
 125 
 126 static int      svc_process_long_reply(SVCXPRT *, xdrproc_t,
 127                         caddr_t, struct rpc_msg *, bool_t, int *,
 128                         int *, int *, unsigned int *);
 129 
 130 static int      svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t,
 131                         caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *,
 132                         bool_t, uint_t *);
 133 static bool_t rpcmsg_length(xdrproc_t,
 134                 caddr_t,
 135                 struct rpc_msg *, bool_t, int);
 136 
 137 /*
 138  * Server transport operations vector.
 139  */
 140 struct svc_ops rdma_svc_ops = {
 141         svc_rdma_krecv,         /* Get requests */
 142         svc_rdma_kgetargs,      /* Deserialize arguments */
 143         svc_rdma_ksend,         /* Send reply */
 144         svc_rdma_kfreeargs,     /* Free argument data space */
 145         svc_rdma_kdestroy,      /* Destroy transport handle */
 146         svc_rdma_kdup,          /* Check entry in dup req cache */
 147         svc_rdma_kdupdone,      /* Mark entry in dup req cache as done */
 148         svc_rdma_kgetres,       /* Get pointer to response buffer */
 149         svc_rdma_kfreeres,      /* Destroy pre-serialized response header */
 150         svc_rdma_kclone_destroy,        /* Destroy a clone xprt */
 151         svc_rdma_kstart,        /* Tell `ready-to-receive' to rpcmod */
 152         svc_rdma_kclone_xprt,   /* Transport specific clone xprt */
 153         svc_rdma_ktattrs,       /* Get Transport Attributes */
 154         NULL,                   /* Increment transport reference count */
 155         NULL                    /* Decrement transport reference count */
 156 };
 157 
 158 /*
 159  * Server statistics
 160  * NOTE: This structure type is duplicated in the NFS fast path.
 161  */
 162 struct {
 163         kstat_named_t   rscalls;
 164         kstat_named_t   rsbadcalls;
 165         kstat_named_t   rsnullrecv;
 166         kstat_named_t   rsbadlen;
 167         kstat_named_t   rsxdrcall;
 168         kstat_named_t   rsdupchecks;
 169         kstat_named_t   rsdupreqs;
 170         kstat_named_t   rslongrpcs;
 171         kstat_named_t   rstotalreplies;
 172         kstat_named_t   rstotallongreplies;
 173         kstat_named_t   rstotalinlinereplies;
 174 } rdmarsstat = {
 175         { "calls",      KSTAT_DATA_UINT64 },
 176         { "badcalls",   KSTAT_DATA_UINT64 },
 177         { "nullrecv",   KSTAT_DATA_UINT64 },
 178         { "badlen",     KSTAT_DATA_UINT64 },
 179         { "xdrcall",    KSTAT_DATA_UINT64 },
 180         { "dupchecks",  KSTAT_DATA_UINT64 },
 181         { "dupreqs",    KSTAT_DATA_UINT64 },
 182         { "longrpcs",   KSTAT_DATA_UINT64 },
 183         { "totalreplies",       KSTAT_DATA_UINT64 },
 184         { "totallongreplies",   KSTAT_DATA_UINT64 },
 185         { "totalinlinereplies", KSTAT_DATA_UINT64 },
 186 };
 187 
 188 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat;
 189 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t);
 190 
 191 #define RSSTAT_INCR(x)  atomic_inc_64(&rdmarsstat.x.value.ui64)
 192 /*
 193  * Create a transport record.
 194  * The transport record, output buffer, and private data structure
 195  * are allocated.  The output buffer is serialized into using xdrmem.
 196  * There is one transport record per user process which implements a
 197  * set of services.
 198  */
 199 /* ARGSUSED */
 200 int
 201 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
 202     rdma_xprt_group_t *started_xprts)
 203 {
 204         int error;
 205         SVCMASTERXPRT *xprt;
 206         struct rdma_data *rd;
 207         rdma_registry_t *rmod;
 208         rdma_xprt_record_t *xprt_rec;
 209         queue_t *q;
 210         /*
 211          * modload the RDMA plugins is not already done.
 212          */
 213         if (!rdma_modloaded) {
 214                 /*CONSTANTCONDITION*/
 215                 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN);
 216 
 217                 mutex_enter(&rdma_modload_lock);
 218                 if (!rdma_modloaded) {
 219                         error = rdma_modload();
 220                 }
 221                 mutex_exit(&rdma_modload_lock);
 222 
 223                 if (error)
 224                         return (error);
 225         }
 226 
 227         /*
 228          * master_xprt_count is the count of master transport handles
 229          * that were successfully created and are ready to recieve for
 230          * RDMA based access.
 231          */
 232         error = 0;
 233         xprt_rec = NULL;
 234         rw_enter(&rdma_lock, RW_READER);
 235         if (rdma_mod_head == NULL) {
 236                 started_xprts->rtg_count = 0;
 237                 rw_exit(&rdma_lock);
 238                 if (rdma_dev_available)
 239                         return (EPROTONOSUPPORT);
 240                 else
 241                         return (ENODEV);
 242         }
 243 
 244         /*
 245          * If we have reached here, then atleast one RDMA plugin has loaded.
 246          * Create a master_xprt, make it start listenining on the device,
 247          * if an error is generated, record it, we might need to shut
 248          * the master_xprt.
 249          * SVC_START() calls svc_rdma_kstart which calls plugin binding
 250          * routines.
 251          */
 252         for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) {
 253 
 254                 /*
 255                  * One SVCMASTERXPRT per RDMA plugin.
 256                  */
 257                 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP);
 258                 xprt->xp_ops = &rdma_svc_ops;
 259                 xprt->xp_sct = sct;
 260                 xprt->xp_type = T_RDMA;
 261                 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
 262                 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
 263                 xprt->xp_req_head = (mblk_t *)0;
 264                 xprt->xp_req_tail = (mblk_t *)0;
 265                 xprt->xp_full = FALSE;
 266                 xprt->xp_enable = FALSE;
 267                 xprt->xp_reqs = 0;
 268                 xprt->xp_size = 0;
 269                 xprt->xp_threads = 0;
 270                 xprt->xp_detached_threads = 0;
 271 
 272                 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP);
 273                 xprt->xp_p2 = (caddr_t)rd;
 274                 rd->rd_xprt = xprt;
 275                 rd->r_mod = rmod->r_mod;
 276 
 277                 q = &rd->rd_data.q;
 278                 xprt->xp_wq = q;
 279                 q->q_ptr = &rd->rd_xprt;
 280                 xprt->xp_netid = NULL;
 281 
 282                 /*
 283                  * Each of the plugins will have their own Service ID
 284                  * to listener specific mapping, like port number for VI
 285                  * and service name for IB.
 286                  */
 287                 rd->rd_data.svcid = id;
 288                 error = svc_xprt_register(xprt, id);
 289                 if (error) {
 290                         DTRACE_PROBE(krpc__e__svcrdma__xprt__reg);
 291                         goto cleanup;
 292                 }
 293 
 294                 SVC_START(xprt);
 295                 if (!rd->rd_data.active) {
 296                         svc_xprt_unregister(xprt);
 297                         error = rd->rd_data.err_code;
 298                         goto cleanup;
 299                 }
 300 
 301                 /*
 302                  * This is set only when there is atleast one or more
 303                  * transports successfully created. We insert the pointer
 304                  * to the created RDMA master xprt into a separately maintained
 305                  * list. This way we can easily reference it later to cleanup,
 306                  * when NFS kRPC service pool is going away/unregistered.
 307                  */
 308                 started_xprts->rtg_count ++;
 309                 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP);
 310                 xprt_rec->rtr_xprt_ptr = xprt;
 311                 xprt_rec->rtr_next = started_xprts->rtg_listhead;
 312                 started_xprts->rtg_listhead = xprt_rec;
 313                 continue;
 314 cleanup:
 315                 SVC_DESTROY(xprt);
 316                 if (error == RDMA_FAILED)
 317                         error = EPROTONOSUPPORT;
 318         }
 319 
 320         rw_exit(&rdma_lock);
 321 
 322         /*
 323          * Don't return any error even if a single plugin was started
 324          * successfully.
 325          */
 326         if (started_xprts->rtg_count == 0)
 327                 return (error);
 328         return (0);
 329 }
 330 
 331 /*
 332  * Cleanup routine for freeing up memory allocated by
 333  * svc_rdma_kcreate()
 334  */
 335 void
 336 svc_rdma_kdestroy(SVCMASTERXPRT *xprt)
 337 {
 338         struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2;
 339 
 340 
 341         mutex_destroy(&xprt->xp_req_lock);
 342         mutex_destroy(&xprt->xp_thread_lock);
 343         kmem_free(rd, sizeof (*rd));
 344         kmem_free(xprt, sizeof (*xprt));
 345 }
 346 
 347 
 348 static void
 349 svc_rdma_kstart(SVCMASTERXPRT *xprt)
 350 {
 351         struct rdma_svc_data *svcdata;
 352         rdma_mod_t *rmod;
 353 
 354         svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 355         rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 356 
 357         /*
 358          * Create a listener for  module at this port
 359          */
 360 
 361         if (rmod->rdma_count != 0)
 362                 (*rmod->rdma_ops->rdma_svc_listen)(svcdata);
 363         else
 364                 svcdata->err_code = RDMA_FAILED;
 365 }
 366 
 367 void
 368 svc_rdma_kstop(SVCMASTERXPRT *xprt)
 369 {
 370         struct rdma_svc_data *svcdata;
 371         rdma_mod_t *rmod;
 372 
 373         svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 374         rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 375 
 376         /*
 377          * Call the stop listener routine for each plugin. If rdma_count is
 378          * already zero set active to zero.
 379          */
 380         if (rmod->rdma_count != 0)
 381                 (*rmod->rdma_ops->rdma_svc_stop)(svcdata);
 382         else
 383                 svcdata->active = 0;
 384         if (svcdata->active)
 385                 DTRACE_PROBE(krpc__e__svcrdma__kstop);
 386 }
 387 
 388 /* ARGSUSED */
 389 static void
 390 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt)
 391 {
 392 
 393         struct clone_rdma_data *cdrp;
 394         cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 395 
 396         /*
 397          * Only free buffers and release connection when cloned is set.
 398          */
 399         if (cdrp->cloned != TRUE)
 400                 return;
 401 
 402         rdma_buf_free(cdrp->conn, &cdrp->rpcbuf);
 403         if (cdrp->cl_reply) {
 404                 clist_free(cdrp->cl_reply);
 405                 cdrp->cl_reply = NULL;
 406         }
 407         RDMA_REL_CONN(cdrp->conn);
 408 
 409         cdrp->cloned = 0;
 410 }
 411 
 412 /*
 413  * Clone the xprt specific information.  It will be freed by
 414  * SVC_CLONE_DESTROY.
 415  */
 416 static void
 417 svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt)
 418 {
 419         struct clone_rdma_data *srcp2;
 420         struct clone_rdma_data *dstp2;
 421 
 422         srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf;
 423         dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf;
 424 
 425         if (srcp2->conn != NULL) {
 426                 srcp2->cloned = TRUE;
 427                 *dstp2 = *srcp2;
 428         }
 429 }
 430 
 431 static void
 432 svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr)
 433 {
 434         CONN    *conn;
 435         *tattr = NULL;
 436 
 437         switch (attrflag) {
 438         case SVC_TATTR_ADDRMASK:
 439                 conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn;
 440                 ASSERT(conn != NULL);
 441                 if (conn)
 442                         *tattr = (void *)&conn->c_addrmask;
 443         }
 444 }
 445 
 446 static bool_t
 447 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg)
 448 {
 449         XDR     *xdrs;
 450         CONN    *conn;
 451         rdma_recv_data_t        *rdp = (rdma_recv_data_t *)mp->b_rptr;
 452         struct clone_rdma_data *crdp;
 453         struct clist    *cl = NULL;
 454         struct clist    *wcl = NULL;
 455         struct clist    *cllong = NULL;
 456 
 457         rdma_stat       status;
 458         uint32_t vers, op, pos, xid;
 459         uint32_t rdma_credit;
 460         uint32_t wcl_total_length = 0;
 461         bool_t  wwl = FALSE;
 462 
 463         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 464         RSSTAT_INCR(rscalls);
 465         conn = rdp->conn;
 466 
 467         status = rdma_svc_postrecv(conn);
 468         if (status != RDMA_SUCCESS) {
 469                 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv);
 470                 goto badrpc_call;
 471         }
 472 
 473         xdrs = &clone_xprt->xp_xdrin;
 474         xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE);
 475         xid = *(uint32_t *)rdp->rpcmsg.addr;
 476         XDR_SETPOS(xdrs, sizeof (uint32_t));
 477 
 478         if (! xdr_u_int(xdrs, &vers) ||
 479             ! xdr_u_int(xdrs, &rdma_credit) ||
 480             ! xdr_u_int(xdrs, &op)) {
 481                 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint);
 482                 goto xdr_err;
 483         }
 484 
 485         /* Checking if the status of the recv operation was normal */
 486         if (rdp->status != 0) {
 487                 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status,
 488                     int, rdp->status);
 489                 goto badrpc_call;
 490         }
 491 
 492         if (! xdr_do_clist(xdrs, &cl)) {
 493                 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist);
 494                 goto xdr_err;
 495         }
 496 
 497         if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) {
 498                 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist);
 499                 if (cl)
 500                         clist_free(cl);
 501                 goto xdr_err;
 502         }
 503         crdp->cl_wlist = wcl;
 504 
 505         crdp->cl_reply = NULL;
 506         (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply);
 507 
 508         /*
 509          * A chunk at 0 offset indicates that the RPC call message
 510          * is in a chunk. Get the RPC call message chunk.
 511          */
 512         if (cl != NULL && op == RDMA_NOMSG) {
 513 
 514                 /* Remove RPC call message chunk from chunklist */
 515                 cllong = cl;
 516                 cl = cl->c_next;
 517                 cllong->c_next = NULL;
 518 
 519 
 520                 /* Allocate and register memory for the RPC call msg chunk */
 521                 cllong->rb_longbuf.type = RDMA_LONG_BUFFER;
 522                 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ?
 523                     cllong->c_len : LONG_REPLY_LEN;
 524 
 525                 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) {
 526                         clist_free(cllong);
 527                         goto cll_malloc_err;
 528                 }
 529 
 530                 cllong->u.c_daddr3 = cllong->rb_longbuf.addr;
 531 
 532                 if (cllong->u.c_daddr == NULL) {
 533                         DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem);
 534                         rdma_buf_free(conn, &cllong->rb_longbuf);
 535                         clist_free(cllong);
 536                         goto cll_malloc_err;
 537                 }
 538 
 539                 status = clist_register(conn, cllong, CLIST_REG_DST);
 540                 if (status) {
 541                         DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg);
 542                         rdma_buf_free(conn, &cllong->rb_longbuf);
 543                         clist_free(cllong);
 544                         goto cll_malloc_err;
 545                 }
 546 
 547                 /*
 548                  * Now read the RPC call message in
 549                  */
 550                 status = RDMA_READ(conn, cllong, WAIT);
 551                 if (status) {
 552                         DTRACE_PROBE(krpc__e__svcrdma__krecv__read);
 553                         (void) clist_deregister(conn, cllong);
 554                         rdma_buf_free(conn, &cllong->rb_longbuf);
 555                         clist_free(cllong);
 556                         goto cll_malloc_err;
 557                 }
 558 
 559                 status = clist_syncmem(conn, cllong, CLIST_REG_DST);
 560                 (void) clist_deregister(conn, cllong);
 561 
 562                 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3,
 563                     cllong->c_len, 0, cl, XDR_DECODE, conn);
 564 
 565                 crdp->rpcbuf = cllong->rb_longbuf;
 566                 crdp->rpcbuf.len = cllong->c_len;
 567                 clist_free(cllong);
 568                 RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 569         } else {
 570                 pos = XDR_GETPOS(xdrs);
 571                 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos,
 572                     rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn);
 573                 crdp->rpcbuf = rdp->rpcmsg;
 574 
 575                 /* Use xdrrdmablk_ops to indicate there is a read chunk list */
 576                 if (cl != NULL) {
 577                         int32_t flg = XDR_RDMA_RLIST_REG;
 578 
 579                         XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 580                         xdrs->x_ops = &xdrrdmablk_ops;
 581                 }
 582         }
 583 
 584         if (crdp->cl_wlist) {
 585                 int32_t flg = XDR_RDMA_WLIST_REG;
 586 
 587                 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist);
 588                 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 589         }
 590 
 591         if (! xdr_callmsg(xdrs, msg)) {
 592                 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg);
 593                 RSSTAT_INCR(rsxdrcall);
 594                 goto callmsg_err;
 595         }
 596 
 597         /*
 598          * Point the remote transport address in the service_transport
 599          * handle at the address in the request.
 600          */
 601         clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf;
 602         clone_xprt->xp_rtaddr.len = conn->c_raddr.len;
 603         clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len;
 604 
 605         clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf;
 606         clone_xprt->xp_lcladdr.len = conn->c_laddr.len;
 607         clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len;
 608 
 609         /*
 610          * In case of RDMA, connection management is
 611          * entirely done in rpcib module and netid in the
 612          * SVCMASTERXPRT is NULL. Initialize the clone netid
 613          * from the connection.
 614          */
 615 
 616         clone_xprt->xp_netid = conn->c_netid;
 617 
 618         clone_xprt->xp_xid = xid;
 619         crdp->conn = conn;
 620 
 621         freeb(mp);
 622 
 623         return (TRUE);
 624 
 625 callmsg_err:
 626         rdma_buf_free(conn, &crdp->rpcbuf);
 627 
 628 cll_malloc_err:
 629         if (cl)
 630                 clist_free(cl);
 631 xdr_err:
 632         XDR_DESTROY(xdrs);
 633 
 634 badrpc_call:
 635         RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 636         RDMA_REL_CONN(conn);
 637         freeb(mp);
 638         RSSTAT_INCR(rsbadcalls);
 639         return (FALSE);
 640 }
 641 
 642 static int
 643 svc_process_long_reply(SVCXPRT * clone_xprt,
 644     xdrproc_t xdr_results, caddr_t xdr_location,
 645     struct rpc_msg *msg, bool_t has_args, int *msglen,
 646     int *freelen, int *numchunks, unsigned int *final_len)
 647 {
 648         int status;
 649         XDR xdrslong;
 650         struct clist *wcl = NULL;
 651         int count = 0;
 652         int alloc_len;
 653         char  *memp;
 654         rdma_buf_t long_rpc = {0};
 655         struct clone_rdma_data *crdp;
 656 
 657         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 658 
 659         bzero(&xdrslong, sizeof (xdrslong));
 660 
 661         /* Choose a size for the long rpc response */
 662         if (MSG_IS_RPCSEC_GSS(msg)) {
 663                 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen);
 664         } else {
 665                 alloc_len = RNDUP(*msglen);
 666         }
 667 
 668         if (alloc_len <= 64 * 1024) {
 669                 if (alloc_len > 32 * 1024) {
 670                         alloc_len = 64 * 1024;
 671                 } else {
 672                         if (alloc_len > 16 * 1024) {
 673                                 alloc_len = 32 * 1024;
 674                         } else {
 675                                 alloc_len = 16 * 1024;
 676                         }
 677                 }
 678         }
 679 
 680         long_rpc.type = RDMA_LONG_BUFFER;
 681         long_rpc.len = alloc_len;
 682         if (rdma_buf_alloc(crdp->conn, &long_rpc)) {
 683                 return (SVC_RDMA_FAIL);
 684         }
 685 
 686         memp = long_rpc.addr;
 687         xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE);
 688 
 689         msg->rm_xid = clone_xprt->xp_xid;
 690 
 691         if (!(xdr_replymsg(&xdrslong, msg) &&
 692             (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong,
 693             xdr_results, xdr_location)))) {
 694                 rdma_buf_free(crdp->conn, &long_rpc);
 695                 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap);
 696                 return (SVC_RDMA_FAIL);
 697         }
 698 
 699         *final_len = XDR_GETPOS(&xdrslong);
 700 
 701         DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len);
 702         *numchunks = 0;
 703         *freelen = 0;
 704 
 705         wcl = crdp->cl_reply;
 706         wcl->rb_longbuf = long_rpc;
 707 
 708         count = *final_len;
 709         while ((wcl != NULL) && (count > 0)) {
 710 
 711                 if (wcl->c_dmemhandle.mrc_rmr == 0)
 712                         break;
 713 
 714                 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count,
 715                     uint32_t, wcl->c_len);
 716 
 717                 if (wcl->c_len > count) {
 718                         wcl->c_len = count;
 719                 }
 720                 wcl->w.c_saddr3 = (caddr_t)memp;
 721 
 722                 count -= wcl->c_len;
 723                 *numchunks +=  1;
 724                 memp += wcl->c_len;
 725                 wcl = wcl->c_next;
 726         }
 727 
 728         /*
 729          * Make rest of the chunks 0-len
 730          */
 731         while (wcl != NULL) {
 732                 if (wcl->c_dmemhandle.mrc_rmr == 0)
 733                         break;
 734                 wcl->c_len = 0;
 735                 wcl = wcl->c_next;
 736         }
 737 
 738         wcl = crdp->cl_reply;
 739 
 740         /*
 741          * MUST fail if there are still more data
 742          */
 743         if (count > 0) {
 744                 rdma_buf_free(crdp->conn, &long_rpc);
 745                 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist);
 746                 return (SVC_RDMA_FAIL);
 747         }
 748 
 749         if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) {
 750                 rdma_buf_free(crdp->conn, &long_rpc);
 751                 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg);
 752                 return (SVC_RDMA_FAIL);
 753         }
 754 
 755         status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE);
 756 
 757         if (status) {
 758                 (void) clist_deregister(crdp->conn, wcl);
 759                 rdma_buf_free(crdp->conn, &long_rpc);
 760                 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem);
 761                 return (SVC_RDMA_FAIL);
 762         }
 763 
 764         status = RDMA_WRITE(crdp->conn, wcl, WAIT);
 765 
 766         (void) clist_deregister(crdp->conn, wcl);
 767         rdma_buf_free(crdp->conn, &wcl->rb_longbuf);
 768 
 769         if (status != RDMA_SUCCESS) {
 770                 DTRACE_PROBE(krpc__e__svcrdma__longrep__write);
 771                 return (SVC_RDMA_FAIL);
 772         }
 773 
 774         return (SVC_RDMA_SUCCESS);
 775 }
 776 
 777 
 778 static int
 779 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results,
 780     caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs,
 781     struct rpc_msg *msg, bool_t has_args, uint_t *len)
 782 {
 783         /*
 784          * Get a pre-allocated buffer for rpc reply
 785          */
 786         rpcreply->type = SEND_BUFFER;
 787         if (rdma_buf_alloc(conn, rpcreply)) {
 788                 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs);
 789                 return (SVC_RDMA_FAIL);
 790         }
 791 
 792         xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len,
 793             0, NULL, XDR_ENCODE, conn);
 794 
 795         msg->rm_xid = clone_xprt->xp_xid;
 796 
 797         if (has_args) {
 798                 if (!(xdr_replymsg(*xdrs, msg) &&
 799                     (!has_args ||
 800                     SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs,
 801                     xdr_results, xdr_location)))) {
 802                         rdma_buf_free(conn, rpcreply);
 803                         DTRACE_PROBE(
 804                             krpc__e__svcrdma__rpcmsg__reply__authwrap1);
 805                         return (SVC_RDMA_FAIL);
 806                 }
 807         } else {
 808                 if (!xdr_replymsg(*xdrs, msg)) {
 809                         rdma_buf_free(conn, rpcreply);
 810                         DTRACE_PROBE(
 811                             krpc__e__svcrdma__rpcmsg__reply__authwrap2);
 812                         return (SVC_RDMA_FAIL);
 813                 }
 814         }
 815 
 816         *len = XDR_GETPOS(*xdrs);
 817 
 818         return (SVC_RDMA_SUCCESS);
 819 }
 820 
 821 /*
 822  * Send rpc reply.
 823  */
 824 static bool_t
 825 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg)
 826 {
 827         XDR *xdrs_rpc = &(clone_xprt->xp_xdrout);
 828         XDR xdrs_rhdr;
 829         CONN *conn = NULL;
 830         rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0};
 831 
 832         struct clone_rdma_data *crdp;
 833         struct clist *cl_read = NULL;
 834         struct clist *cl_send = NULL;
 835         struct clist *cl_write = NULL;
 836         xdrproc_t xdr_results;          /* results XDR encoding function */
 837         caddr_t xdr_location;           /* response results pointer */
 838 
 839         int retval = FALSE;
 840         int status, msglen, num_wreply_segments = 0;
 841         uint32_t rdma_credit = 0;
 842         int freelen = 0;
 843         bool_t has_args;
 844         uint_t  final_resp_len, rdma_response_op, vers;
 845 
 846         bzero(&xdrs_rhdr, sizeof (XDR));
 847         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 848         conn = crdp->conn;
 849 
 850         /*
 851          * If there is a result procedure specified in the reply message,
 852          * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
 853          * We need to make sure it won't be processed twice, so we null
 854          * it for xdr_replymsg here.
 855          */
 856         has_args = FALSE;
 857         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
 858             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
 859                 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) {
 860                         has_args = TRUE;
 861                         xdr_location = msg->acpted_rply.ar_results.where;
 862                         msg->acpted_rply.ar_results.proc = xdr_void;
 863                         msg->acpted_rply.ar_results.where = NULL;
 864                 }
 865         }
 866 
 867         /*
 868          * Given the limit on the inline response size (RPC_MSG_SZ),
 869          * there is a need to make a guess as to the overall size of
 870          * the response.  If the resultant size is beyond the inline
 871          * size, then the server needs to use the "reply chunk list"
 872          * provided by the client (if the client provided one).  An
 873          * example of this type of response would be a READDIR
 874          * response (e.g. a small directory read would fit in RPC_MSG_SZ
 875          * and that is the preference but it may not fit)
 876          *
 877          * Combine the encoded size and the size of the true results
 878          * and then make the decision about where to encode and send results.
 879          *
 880          * One important note, this calculation is ignoring the size
 881          * of the encoding of the authentication overhead.  The reason
 882          * for this is rooted in the complexities of access to the
 883          * encoded size of RPCSEC_GSS related authentiation,
 884          * integrity, and privacy.
 885          *
 886          * If it turns out that the encoded authentication bumps the
 887          * response over the RPC_MSG_SZ limit, then it may need to
 888          * attempt to encode for the reply chunk list.
 889          */
 890 
 891         /*
 892          * Calculating the "sizeof" the RPC response header and the
 893          * encoded results.
 894          */
 895         msglen = xdr_sizeof(xdr_replymsg, msg);
 896 
 897         if (msglen > 0) {
 898                 RSSTAT_INCR(rstotalreplies);
 899         }
 900         if (has_args)
 901                 msglen += xdrrdma_sizeof(xdr_results, xdr_location,
 902                     rdma_minchunk, NULL, NULL);
 903 
 904         DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen);
 905 
 906         status = SVC_RDMA_SUCCESS;
 907 
 908         if (msglen < RPC_MSG_SZ) {
 909                 /*
 910                  * Looks like the response will fit in the inline
 911                  * response; let's try
 912                  */
 913                 RSSTAT_INCR(rstotalinlinereplies);
 914 
 915                 rdma_response_op = RDMA_MSG;
 916 
 917                 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results,
 918                     xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg,
 919                     has_args, &final_resp_len);
 920 
 921                 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status,
 922                     int, status);
 923                 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len,
 924                     int, final_resp_len);
 925 
 926                 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) {
 927                         clist_free(crdp->cl_reply);
 928                         crdp->cl_reply = NULL;
 929                 }
 930         }
 931 
 932         /*
 933          * If the encode failed (size?) or the message really is
 934          * larger than what is allowed, try the response chunk list.
 935          */
 936         if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) {
 937                 /*
 938                  * attempting to use a reply chunk list when there
 939                  * isn't one won't get very far...
 940                  */
 941                 if (crdp->cl_reply == NULL) {
 942                         DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl);
 943                         goto out;
 944                 }
 945 
 946                 RSSTAT_INCR(rstotallongreplies);
 947 
 948                 msglen = xdr_sizeof(xdr_replymsg, msg);
 949                 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0,
 950                     NULL, NULL);
 951 
 952                 status = svc_process_long_reply(clone_xprt, xdr_results,
 953                     xdr_location, msg, has_args, &msglen, &freelen,
 954                     &num_wreply_segments, &final_resp_len);
 955 
 956                 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen,
 957                     int, final_resp_len);
 958 
 959                 if (status != SVC_RDMA_SUCCESS) {
 960                         DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed);
 961                         goto out;
 962                 }
 963 
 964                 rdma_response_op = RDMA_NOMSG;
 965         }
 966 
 967         DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len,
 968             int, final_resp_len);
 969 
 970         rbuf_resp.type = SEND_BUFFER;
 971         if (rdma_buf_alloc(conn, &rbuf_resp)) {
 972                 rdma_buf_free(conn, &rbuf_rpc_resp);
 973                 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs);
 974                 goto out;
 975         }
 976 
 977         rdma_credit = rdma_bufs_granted;
 978 
 979         vers = RPCRDMA_VERS;
 980         xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE);
 981         (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid;
 982         /* Skip xid and set the xdr position accordingly. */
 983         XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t));
 984         if (!xdr_u_int(&xdrs_rhdr, &vers) ||
 985             !xdr_u_int(&xdrs_rhdr, &rdma_credit) ||
 986             !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) {
 987                 rdma_buf_free(conn, &rbuf_rpc_resp);
 988                 rdma_buf_free(conn, &rbuf_resp);
 989                 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint);
 990                 goto out;
 991         }
 992 
 993         /*
 994          * Now XDR the read chunk list, actually always NULL
 995          */
 996         (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read);
 997 
 998         /*
 999          * encode write list -- we already drove RDMA_WRITEs
1000          */
1001         cl_write = crdp->cl_wlist;
1002         if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) {
1003                 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist);
1004                 rdma_buf_free(conn, &rbuf_rpc_resp);
1005                 rdma_buf_free(conn, &rbuf_resp);
1006                 goto out;
1007         }
1008 
1009         /*
1010          * XDR encode the RDMA_REPLY write chunk
1011          */
1012         if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply,
1013             num_wreply_segments)) {
1014                 rdma_buf_free(conn, &rbuf_rpc_resp);
1015                 rdma_buf_free(conn, &rbuf_resp);
1016                 goto out;
1017         }
1018 
1019         clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle,
1020             rbuf_resp.addr, NULL, NULL);
1021 
1022         if (rdma_response_op == RDMA_MSG) {
1023                 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle,
1024                     rbuf_rpc_resp.addr, NULL, NULL);
1025         }
1026 
1027         status = RDMA_SEND(conn, cl_send, msg->rm_xid);
1028 
1029         if (status == RDMA_SUCCESS) {
1030                 retval = TRUE;
1031         }
1032 
1033 out:
1034         /*
1035          * Free up sendlist chunks
1036          */
1037         if (cl_send != NULL)
1038                 clist_free(cl_send);
1039 
1040         /*
1041          * Destroy private data for xdr rdma
1042          */
1043         if (clone_xprt->xp_xdrout.x_ops != NULL) {
1044                 XDR_DESTROY(&(clone_xprt->xp_xdrout));
1045         }
1046 
1047         if (crdp->cl_reply) {
1048                 clist_free(crdp->cl_reply);
1049                 crdp->cl_reply = NULL;
1050         }
1051 
1052         /*
1053          * This is completely disgusting.  If public is set it is
1054          * a pointer to a structure whose first field is the address
1055          * of the function to free that structure and any related
1056          * stuff.  (see rrokfree in nfs_xdr.c).
1057          */
1058         if (xdrs_rpc->x_public) {
1059                 /* LINTED pointer alignment */
1060                 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public);
1061         }
1062 
1063         if (xdrs_rhdr.x_ops != NULL) {
1064                 XDR_DESTROY(&xdrs_rhdr);
1065         }
1066 
1067         return (retval);
1068 }
1069 
1070 /*
1071  * Deserialize arguments.
1072  */
1073 static bool_t
1074 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr)
1075 {
1076         if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin,
1077             xdr_args, args_ptr)) != TRUE)
1078                 return (FALSE);
1079         return (TRUE);
1080 }
1081 
1082 static bool_t
1083 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args,
1084     caddr_t args_ptr)
1085 {
1086         struct clone_rdma_data *crdp;
1087         bool_t retval;
1088 
1089         /*
1090          * If the cloned bit is true, then this transport specific
1091          * rmda data has been duplicated into another cloned xprt. Do
1092          * not free, or release the connection, it is still in use.  The
1093          * buffers will be freed and the connection released later by
1094          * SVC_CLONE_DESTROY().
1095          */
1096         crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
1097         if (crdp->cloned == TRUE) {
1098                 crdp->cloned = 0;
1099                 return (TRUE);
1100         }
1101 
1102         /*
1103          * Free the args if needed then XDR_DESTROY
1104          */
1105         if (args_ptr) {
1106                 XDR     *xdrs = &clone_xprt->xp_xdrin;
1107 
1108                 xdrs->x_op = XDR_FREE;
1109                 retval = (*xdr_args)(xdrs, args_ptr);
1110         }
1111 
1112         XDR_DESTROY(&(clone_xprt->xp_xdrin));
1113         rdma_buf_free(crdp->conn, &crdp->rpcbuf);
1114         if (crdp->cl_reply) {
1115                 clist_free(crdp->cl_reply);
1116                 crdp->cl_reply = NULL;
1117         }
1118         RDMA_REL_CONN(crdp->conn);
1119 
1120         return (retval);
1121 }
1122 
1123 /* ARGSUSED */
1124 static int32_t *
1125 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size)
1126 {
1127         return (NULL);
1128 }
1129 
1130 /* ARGSUSED */
1131 static void
1132 svc_rdma_kfreeres(SVCXPRT *clone_xprt)
1133 {
1134 }
1135 
1136 /*
1137  * the dup cacheing routines below provide a cache of non-failure
1138  * transaction id's.  rpc service routines can use this to detect
1139  * retransmissions and re-send a non-failure response.
1140  */
1141 
1142 /*
1143  * MAXDUPREQS is the number of cached items.  It should be adjusted
1144  * to the service load so that there is likely to be a response entry
1145  * when the first retransmission comes in.
1146  */
1147 #define MAXDUPREQS      8192
1148 
1149 /*
1150  * This should be appropriately scaled to MAXDUPREQS.  To produce as less as
1151  * possible collisions it is suggested to set this to a prime.
1152  */
1153 #define DRHASHSZ        2053
1154 
1155 #define XIDHASH(xid)    ((xid) % DRHASHSZ)
1156 #define DRHASH(dr)      XIDHASH((dr)->dr_xid)
1157 #define REQTOXID(req)   ((req)->rq_xprt->xp_xid)
1158 
1159 static int      rdmandupreqs = 0;
1160 int     rdmamaxdupreqs = MAXDUPREQS;
1161 static kmutex_t rdmadupreq_lock;
1162 static struct dupreq *rdmadrhashtbl[DRHASHSZ];
1163 static int      rdmadrhashstat[DRHASHSZ];
1164 
1165 static void unhash(struct dupreq *);
1166 
1167 /*
1168  * rdmadrmru points to the head of a circular linked list in lru order.
1169  * rdmadrmru->dr_next == drlru
1170  */
1171 struct dupreq *rdmadrmru;
1172 
1173 /*
1174  * svc_rdma_kdup searches the request cache and returns 0 if the
1175  * request is not found in the cache.  If it is found, then it
1176  * returns the state of the request (in progress or done) and
1177  * the status or attributes that were part of the original reply.
1178  */
1179 static int
1180 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
1181     bool_t *dupcachedp)
1182 {
1183         struct dupreq *dr;
1184         uint32_t xid;
1185         uint32_t drhash;
1186         int status;
1187 
1188         xid = REQTOXID(req);
1189         mutex_enter(&rdmadupreq_lock);
1190         RSSTAT_INCR(rsdupchecks);
1191         /*
1192          * Check to see whether an entry already exists in the cache.
1193          */
1194         dr = rdmadrhashtbl[XIDHASH(xid)];
1195         while (dr != NULL) {
1196                 if (dr->dr_xid == xid &&
1197                     dr->dr_proc == req->rq_proc &&
1198                     dr->dr_prog == req->rq_prog &&
1199                     dr->dr_vers == req->rq_vers &&
1200                     dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len &&
1201                     bcmp((caddr_t)dr->dr_addr.buf,
1202                     (caddr_t)req->rq_xprt->xp_rtaddr.buf,
1203                     dr->dr_addr.len) == 0) {
1204                         status = dr->dr_status;
1205                         if (status == DUP_DONE) {
1206                                 bcopy(dr->dr_resp.buf, res, size);
1207                                 if (dupcachedp != NULL)
1208                                         *dupcachedp = (dr->dr_resfree != NULL);
1209                         } else {
1210                                 dr->dr_status = DUP_INPROGRESS;
1211                                 *drpp = dr;
1212                         }
1213                         RSSTAT_INCR(rsdupreqs);
1214                         mutex_exit(&rdmadupreq_lock);
1215                         return (status);
1216                 }
1217                 dr = dr->dr_chain;
1218         }
1219 
1220         /*
1221          * There wasn't an entry, either allocate a new one or recycle
1222          * an old one.
1223          */
1224         if (rdmandupreqs < rdmamaxdupreqs) {
1225                 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP);
1226                 if (dr == NULL) {
1227                         mutex_exit(&rdmadupreq_lock);
1228                         return (DUP_ERROR);
1229                 }
1230                 dr->dr_resp.buf = NULL;
1231                 dr->dr_resp.maxlen = 0;
1232                 dr->dr_addr.buf = NULL;
1233                 dr->dr_addr.maxlen = 0;
1234                 if (rdmadrmru) {
1235                         dr->dr_next = rdmadrmru->dr_next;
1236                         rdmadrmru->dr_next = dr;
1237                 } else {
1238                         dr->dr_next = dr;
1239                 }
1240                 rdmandupreqs++;
1241         } else {
1242                 dr = rdmadrmru->dr_next;
1243                 while (dr->dr_status == DUP_INPROGRESS) {
1244                         dr = dr->dr_next;
1245                         if (dr == rdmadrmru->dr_next) {
1246                                 mutex_exit(&rdmadupreq_lock);
1247                                 return (DUP_ERROR);
1248                         }
1249                 }
1250                 unhash(dr);
1251                 if (dr->dr_resfree) {
1252                         (*dr->dr_resfree)(dr->dr_resp.buf);
1253                 }
1254         }
1255         dr->dr_resfree = NULL;
1256         rdmadrmru = dr;
1257 
1258         dr->dr_xid = REQTOXID(req);
1259         dr->dr_prog = req->rq_prog;
1260         dr->dr_vers = req->rq_vers;
1261         dr->dr_proc = req->rq_proc;
1262         if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) {
1263                 if (dr->dr_addr.buf != NULL)
1264                         kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
1265                 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len;
1266                 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP);
1267                 if (dr->dr_addr.buf == NULL) {
1268                         dr->dr_addr.maxlen = 0;
1269                         dr->dr_status = DUP_DROP;
1270                         mutex_exit(&rdmadupreq_lock);
1271                         return (DUP_ERROR);
1272                 }
1273         }
1274         dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len;
1275         bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len);
1276         if (dr->dr_resp.maxlen < size) {
1277                 if (dr->dr_resp.buf != NULL)
1278                         kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
1279                 dr->dr_resp.maxlen = (unsigned int)size;
1280                 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP);
1281                 if (dr->dr_resp.buf == NULL) {
1282                         dr->dr_resp.maxlen = 0;
1283                         dr->dr_status = DUP_DROP;
1284                         mutex_exit(&rdmadupreq_lock);
1285                         return (DUP_ERROR);
1286                 }
1287         }
1288         dr->dr_status = DUP_INPROGRESS;
1289 
1290         drhash = (uint32_t)DRHASH(dr);
1291         dr->dr_chain = rdmadrhashtbl[drhash];
1292         rdmadrhashtbl[drhash] = dr;
1293         rdmadrhashstat[drhash]++;
1294         mutex_exit(&rdmadupreq_lock);
1295         *drpp = dr;
1296         return (DUP_NEW);
1297 }
1298 
1299 /*
1300  * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP)
1301  * and stores the response.
1302  */
1303 static void
1304 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(),
1305     int size, int status)
1306 {
1307         ASSERT(dr->dr_resfree == NULL);
1308         if (status == DUP_DONE) {
1309                 bcopy(res, dr->dr_resp.buf, size);
1310                 dr->dr_resfree = dis_resfree;
1311         }
1312         dr->dr_status = status;
1313 }
1314 
1315 /*
1316  * This routine expects that the mutex, rdmadupreq_lock, is already held.
1317  */
1318 static void
1319 unhash(struct dupreq *dr)
1320 {
1321         struct dupreq *drt;
1322         struct dupreq *drtprev = NULL;
1323         uint32_t drhash;
1324 
1325         ASSERT(MUTEX_HELD(&rdmadupreq_lock));
1326 
1327         drhash = (uint32_t)DRHASH(dr);
1328         drt = rdmadrhashtbl[drhash];
1329         while (drt != NULL) {
1330                 if (drt == dr) {
1331                         rdmadrhashstat[drhash]--;
1332                         if (drtprev == NULL) {
1333                                 rdmadrhashtbl[drhash] = drt->dr_chain;
1334                         } else {
1335                                 drtprev->dr_chain = drt->dr_chain;
1336                         }
1337                         return;
1338                 }
1339                 drtprev = drt;
1340                 drt = drt->dr_chain;
1341         }
1342 }
1343 
1344 bool_t
1345 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist)
1346 {
1347         struct clist    *clist;
1348         uint32_t        tlen;
1349 
1350         if (req->rq_xprt->xp_type != T_RDMA) {
1351                 return (FALSE);
1352         }
1353 
1354         tlen = 0;
1355         clist = wlist;
1356         while (clist) {
1357                 tlen += clist->c_len;
1358                 clist = clist->c_next;
1359         }
1360 
1361         /*
1362          * set iov to addr+len of first segment of first wchunk of
1363          * wlist sent by client.  krecv() already malloc'd a buffer
1364          * large enough, but registration is deferred until we write
1365          * the buffer back to (NFS) client using RDMA_WRITE.
1366          */
1367         iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr;
1368         iov->iov_len = tlen;
1369 
1370         return (TRUE);
1371 }
1372 
1373 /*
1374  * routine to setup the read chunk lists
1375  */
1376 
1377 int
1378 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len)
1379 {
1380         int             data_len, avail_len;
1381         uint_t          round_len;
1382 
1383         data_len = avail_len = 0;
1384 
1385         while (wcl != NULL && count > 0) {
1386                 if (wcl->c_dmemhandle.mrc_rmr == 0)
1387                         break;
1388 
1389                 if (wcl->c_len < count) {
1390                         data_len += wcl->c_len;
1391                         avail_len = 0;
1392                 } else {
1393                         data_len += count;
1394                         avail_len = wcl->c_len - count;
1395                         wcl->c_len = count;
1396                 }
1397                 count -= wcl->c_len;
1398 
1399                 if (count == 0)
1400                         break;
1401 
1402                 wcl = wcl->c_next;
1403         }
1404 
1405         /*
1406          * MUST fail if there are still more data
1407          */
1408         if (count > 0) {
1409                 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len,
1410                     int, data_len, int, count);
1411                 return (FALSE);
1412         }
1413 
1414         /*
1415          * Round up the last chunk to 4-byte boundary
1416          */
1417         *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT);
1418         round_len = *wcl_len - data_len;
1419 
1420         if (round_len) {
1421 
1422                 /*
1423                  * If there is space in the current chunk,
1424                  * add the roundup to the chunk.
1425                  */
1426                 if (avail_len >= round_len) {
1427                         wcl->c_len += round_len;
1428                 } else  {
1429                         /*
1430                          * try the next one.
1431                          */
1432                         wcl = wcl->c_next;
1433                         if ((wcl == NULL) || (wcl->c_len < round_len)) {
1434                                 DTRACE_PROBE1(
1435                                     krpc__e__rdma_setup_read_chunks_rndup,
1436                                     int, round_len);
1437                                 return (FALSE);
1438                         }
1439                         wcl->c_len = round_len;
1440                 }
1441         }
1442 
1443         wcl = wcl->c_next;
1444 
1445         /*
1446          * Make rest of the chunks 0-len
1447          */
1448 
1449         clist_zero_len(wcl);
1450 
1451         return (TRUE);
1452 }